为了账号安全,请及时绑定邮箱和手机立即绑定

有没有办法直接从处理器内部将数据发送到 Kafka 主题?

有没有办法直接从处理器内部将数据发送到 Kafka 主题?

猛跑小猪 2021-07-13 17:04:43
我试图在 Kafka Streams 的帮助下实现以下逻辑:听一些来自主题的参考数据,例如。ref-data-topic并StateStore从中创建一个全局。收听来自另一个主题的消息,data-topic这些消息必须根据 ref 数据进行验证并发送到success或errors主题。下面是示例伪代码:class SomeProcessor implements Processor<String, String> {    private KeyValueStore<String, String> refDataStore;    @Override    public void init(final ProcessorContext context) {        refDataStore = (KeyValueStore) context.getStateStore("ref-data-store");    }    @Override    public void process(String key String value) {        Object refData = refDataStore.get("some_key");        // business logic here        if(ok) {           sendValueToTopic("success");        } else {           sendValueToTopic("errors");        }    }}或者实现这种理想行为的规范方法是什么?就像我现在想到的另一种方法是用验证信息丰富处理器中的数据,然后将所有内容发送到一个主题中,让客户端处理例如validationStatus接收到的消息。虽然,我真的很想有一个包含两个主题的解决方案,因为例如,在这种情况下,我可以使用 Kafka Connectsuccess topic直接链接到某个数据存储并以error topic某种方式进行处理。同样,在只有一个主题的方法中,我不知道如何实现这个“store_only_successfully_validated_entities”用例。有什么想法和建议吗?
查看完整描述

1 回答

  • 1 回答
  • 0 关注
  • 117 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信