3 回答
TA贡献1963条经验 获得超6个赞
我认为你应该使用KStream::to(final TopicNameExtractor<K, V> topicExtractor)
函数。它使您能够计算每条消息的主题名称。
示例代码:
final KStream<String, String> stream = ???; stream.to((key, value, recordContext) -> key);
TA贡献1853条经验 获得超6个赞
如果您需要为每个用户生成聚合数据,则无需为每个用户写入单独的主题。您最好在源流上编写聚合。这样,您就不会最终得到每个键一个主题,但您仍然可以独立地对每个用户运行操作。
Serde<UserRecord> recordSerde = ... KStream<Stream, UserAggregate> aggregateByName = recordsByName .groupByKey(Grouped.with(Serdes.String(), recordSerde)) .aggregate(...) .toStream()
这种方法将扩展到数百万用户,这是您目前无法通过每个用户一个主题的方法实现的。
TA贡献1794条经验 获得超7个赞
我想你正在寻找的是KStream#branch
。
以下未经测试,但显示了总体思路
// get a list of predicates to branch a topic on
final List<String> names = Arrays.asList("jhon", "sean", "mary");
final Predicate[] predicates = names.stream()
.map((Function<String, Predicate<String, Object>>) n -> (s, o) -> s.equals(n))
.toArray(Predicate[]::new);
// example input
final KStream<Object, Object> stream = new StreamsBuilder().stream("names");
// split the topic
KStream<String, Object>[] branches = stream.branch(predicates);
for (int i = 0; i < names.size(); i++) {
branches[i].to(names.get(i));
}
// KStream branches[0] contains all records whose keys are "jhon"
// KStream branches[1] contains all records whose keys are "sean"
...
添加回答
举报