我有一个接收记录的 Kafka 流,我想根据特定字段连接消息。流中的消息如下所示:Key: 2099Payload{ email: tom@emample.com eventCode: 2099}预期输出:key: 2099Payload{ emails: tom@example, bill@acme.com, jane@example.com}我可以让溪流运行良好,我只是不确定lamda应该包含什么。这就是我迄今为止所做的。我不确定我是否应该使用映射,聚合或减少或组合这些操作。final StreamsBuilder builder = new StreamsBuilder();KStream<String, Payload> inputStream = builder.stream(INPUT_TOPIC);inputStream .groupByKey() .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000))) // Not sure what to do here …..}).to (OUTPUT_TOPIC );
1 回答
莫回无
TA贡献1865条经验 获得超7个赞
它可能是这样的东西
inputStream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000)))
.aggregate(PayloadAggr::new, new Aggregator<String, Payload, PayloadAggr>() {
@Override
public PayloadAggr apply(String key, Payload newValue, PayloadAggr result) {
result.setKey(key);
if(result.getEmails()==null){
result.setEmails(newValue.getEmail());
}else{
result.setEmails(result.getEmails() + "," + newValue.getEmail());
}
return result;
}
}, .../* You serdes and store */}).toStream().to(OUTPUT_TOPIC);
添加回答
举报
0/150
提交
取消