为了账号安全,请及时绑定邮箱和手机立即绑定

如何在 spring-cloud-stream 中的 kafka 进程拓扑中使用交互式查询?

如何在 spring-cloud-stream 中的 kafka 进程拓扑中使用交互式查询?

PIPIONE 2022-11-02 17:00:47
是否可以在 Spring Cloud Stream 中使用带有 @EnableBinding 注释的类或在带有 @StreamListener 的方法中使用交互式查询(InteractiveQueryService)?我尝试在提供的 KStreamMusicSampleApplication类和处理方法中实例化 ReadOnlyKeyValueStore,但它始终为空。我的@StreamListener 方法正在侦听一堆KTables 和KStreams,并且在流程拓扑(例如过滤)期间,我必须检查来自KStream 的密钥是否已经存在于特定KTable 中。我试图弄清楚如何扫描传入的 KTable 以检查密钥是否已经存在但没有运气。然后我遇到了 InteractiveQueryService,它的 get() 方法可用于检查 KTable 中的 state store materializedAs 中是否存在密钥。问题是我无法从流程拓扑(@EnableBinding 或@StreamListener)访问它。它只能从这些注释之外访问,例如 RestController。有没有办法扫描传入的 KTable 以检查键或值是否存在?如果没有,那么我们可以在流程拓扑中访问 InteractiveQueryService 吗?
查看完整描述

1 回答

?
莫回无

TA贡献1865条经验 获得超7个赞

InteractiveQueryServiceSpring Cloud Stream 中的StreamListener. 正如您所提到的,它应该在您的主要拓扑之外使用。但是,对于您描述的用例,您仍然可以使用主流程中的状态存储。例如,如果您有一个传入KStream和 aKTable被具体化为状态存储,那么您可以调用并process以KStream这种方式访问状态存储。这是一个粗略的代码来实现这一点。您需要将其转换为适合您的特定用例,但这是一个想法。


ReadOnlyKeyValueStore<Object, String> store;


 input.process(() -> new Processor<Object, Product>() {


                @Override

                public void init(ProcessorContext processorContext) {

                    store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");



                }


                @Override

                public void process(Object key, Object value) {

                    //find the key

                    store.get(key);

                }


                @Override

                public void close() {

                    if (state != null) {

                        state.close();

                    }

                }

            }, "my-store");


查看完整回答
反对 回复 2022-11-02
  • 1 回答
  • 0 关注
  • 86 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信