为了账号安全,请及时绑定邮箱和手机立即绑定

使用 kafka 流根据消息密钥将消息发送到主题

使用 kafka 流根据消息密钥将消息发送到主题

跃然一笑 2023-08-04 15:23:09
我希望能够根据消息键的键将 Kafkastream 中的所有记录发送到不同的主题。前任。Kafka 中的流包含名称作为键,记录作为值。我想根据记录的键将这些记录分散到不同的主题数据:(jhon -> {jhonsRecord}),(sean -> {seansRecord}),(mary -> {marysRecord}),(jhon -> {jhonsRecord2}),预期topic1 :名称: jhon ->(jhon -> {jhonsRecord}),(jhon -> {jhonsRecord2})主题2:sean->(sean -> {seansRecord})主题3:玛丽 ->(玛丽 -> {marysRecord})下面是我现在执行此操作的方式,但由于名称列表是 hudge,所以速度很慢。另外,即使记录了几个名字,我也需要遍历整个列表请提出修复建议 for( String name : names )     {         recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);     }
查看完整描述

3 回答

?
神不在的星期二

TA贡献1963条经验 获得超6个赞

我认为你应该使用KStream::to(final TopicNameExtractor<K, V> topicExtractor)函数。它使您能够计算每条消息的主题名称。

示例代码:

final KStream<String, String> stream = ???;
stream.to((key, value, recordContext) -> key);


查看完整回答
反对 回复 2023-08-04
?
墨色风雨

TA贡献1853条经验 获得超6个赞

如果您需要为每个用户生成聚合数据,则无需为每个用户写入单独的主题。您最好在源流上编写聚合。这样,您就不会最终得到每个键一个主题,但您仍然可以独立地对每个用户运行操作。

Serde<UserRecord> recordSerde = ...
KStream<Stream, UserAggregate> aggregateByName = recordsByName
   .groupByKey(Grouped.with(Serdes.String(), recordSerde))
   .aggregate(...)
   .toStream()

这种方法将扩展到数百万用户,这是您目前无法通过每个用户一个主题的方法实现的。


查看完整回答
反对 回复 2023-08-04
?
慕田峪9158850

TA贡献1794条经验 获得超7个赞

我想你正在寻找的是KStream#branch

以下未经测试,但显示了总体思路

// get a list of predicates to branch a topic on

final List<String> names = Arrays.asList("jhon", "sean", "mary");

final Predicate[] predicates = names.stream()

    .map((Function<String, Predicate<String, Object>>) n -> (s, o) -> s.equals(n))

    .toArray(Predicate[]::new);


// example input

final KStream<Object, Object> stream = new StreamsBuilder().stream("names");


// split the topic

KStream<String, Object>[] branches = stream.branch(predicates);

for (int i = 0; i < names.size(); i++) {

    branches[i].to(names.get(i));

}


// KStream branches[0] contains all records whose keys are "jhon"

// KStream branches[1] contains all records whose keys are "sean"

...


查看完整回答
反对 回复 2023-08-04
  • 3 回答
  • 0 关注
  • 127 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信