1 回答

TA贡献1865条经验 获得超7个赞
它可能是这样的东西
inputStream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000)))
.aggregate(PayloadAggr::new, new Aggregator<String, Payload, PayloadAggr>() {
@Override
public PayloadAggr apply(String key, Payload newValue, PayloadAggr result) {
result.setKey(key);
if(result.getEmails()==null){
result.setEmails(newValue.getEmail());
}else{
result.setEmails(result.getEmails() + "," + newValue.getEmail());
}
return result;
}
}, .../* You serdes and store */}).toStream().to(OUTPUT_TOPIC);
添加回答
举报