1 回答
TA贡献1865条经验 获得超7个赞
InteractiveQueryServiceSpring Cloud Stream 中的StreamListener. 正如您所提到的,它应该在您的主要拓扑之外使用。但是,对于您描述的用例,您仍然可以使用主流程中的状态存储。例如,如果您有一个传入KStream和 aKTable被具体化为状态存储,那么您可以调用并process以KStream这种方式访问状态存储。这是一个粗略的代码来实现这一点。您需要将其转换为适合您的特定用例,但这是一个想法。
ReadOnlyKeyValueStore<Object, String> store;
input.process(() -> new Processor<Object, Product>() {
@Override
public void init(ProcessorContext processorContext) {
store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");
}
@Override
public void process(Object key, Object value) {
//find the key
store.get(key);
}
@Override
public void close() {
if (state != null) {
state.close();
}
}
}, "my-store");
添加回答
举报