我试图在 Kafka Streams 的帮助下实现以下逻辑:听一些来自主题的参考数据,例如。ref-data-topic并StateStore从中创建一个全局。收听来自另一个主题的消息,data-topic这些消息必须根据 ref 数据进行验证并发送到success或errors主题。下面是示例伪代码:class SomeProcessor implements Processor<String, String> { private KeyValueStore<String, String> refDataStore; @Override public void init(final ProcessorContext context) { refDataStore = (KeyValueStore) context.getStateStore("ref-data-store"); } @Override public void process(String key String value) { Object refData = refDataStore.get("some_key"); // business logic here if(ok) { sendValueToTopic("success"); } else { sendValueToTopic("errors"); } }}或者实现这种理想行为的规范方法是什么?就像我现在想到的另一种方法是用验证信息丰富处理器中的数据,然后将所有内容发送到一个主题中,让客户端处理例如validationStatus接收到的消息。虽然,我真的很想有一个包含两个主题的解决方案,因为例如,在这种情况下,我可以使用 Kafka Connectsuccess topic直接链接到某个数据存储并以error topic某种方式进行处理。同样,在只有一个主题的方法中,我不知道如何实现这个“store_only_successfully_validated_entities”用例。有什么想法和建议吗?
添加回答
举报
0/150
提交
取消