为了账号安全,请及时绑定邮箱和手机立即绑定

卡夫卡流分组依据和串联

卡夫卡流分组依据和串联

慕后森 2022-09-21 16:42:23
我有一个接收记录的 Kafka 流,我想根据特定字段连接消息。流中的消息如下所示:Key: 2099Payload{  email: tom@emample.com  eventCode: 2099}预期输出:key: 2099Payload{    emails: tom@example, bill@acme.com, jane@example.com}我可以让溪流运行良好,我只是不确定lamda应该包含什么。这就是我迄今为止所做的。我不确定我是否应该使用映射,聚合或减少或组合这些操作。final StreamsBuilder builder = new StreamsBuilder();KStream<String, Payload> inputStream = builder.stream(INPUT_TOPIC);inputStream        .groupByKey()        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000)))                                  // Not sure what to do here …..}).to (OUTPUT_TOPIC );
查看完整描述

1 回答

?
莫回无

TA贡献1865条经验 获得超7个赞

它可能是这样的东西


inputStream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000)))

.aggregate(PayloadAggr::new, new Aggregator<String, Payload, PayloadAggr>() {

        @Override

        public PayloadAggr apply(String key, Payload newValue, PayloadAggr result) {

            result.setKey(key);

            if(result.getEmails()==null){

                result.setEmails(newValue.getEmail());

            }else{

                result.setEmails(result.getEmails() + "," + newValue.getEmail());

            }

            return result;

        }

    }, .../* You serdes and store */}).toStream().to(OUTPUT_TOPIC);


查看完整回答
反对 回复 2022-09-21
  • 1 回答
  • 0 关注
  • 87 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信