为了账号安全,请及时绑定邮箱和手机立即绑定

这种聚合在 Kafka 流中是如何工作的?

这种聚合在 Kafka 流中是如何工作的?

郎朗坤 2021-11-03 10:39:32
我是 Apache Kafka 的新手。我阅读了一个 Steam 应用程序的代码,偶然发现了聚合操作。我试图自己理解它,如果我的解释正确,我需要确认。下面提供了从主题和聚合中读取的代码片段,// json Serdefinal Serializer<JsonNode> jsonSerializer = new JsonSerializer();final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);KStreamBuilder builder = new KStreamBuilder();// read from the topic 'bank-transactions' as `KStream`. I provided the producer below KStream<String, JsonNode> bankTransactions = builder.stream(Serdes.String(), jsonSerde, "bank-transactions");// we define the grouping and aggregation here KTable<String, JsonNode> bankBalance = bankTransactions.groupByKey(Serdes.String(), jsonSerde)    .aggregate(            () -> initialBalance,            (key, transaction, balance) -> newBalance(transaction, balance),            jsonSerde,            "bank-balance-agg"    );bank-transactions主题的数据流产生如下,public static ProducerRecord<String, String> newRandomTransaction(String name) {    // creates an empty json {}    ObjectNode transaction = JsonNodeFactory.instance.objectNode();    Integer amount = ThreadLocalRandom.current().nextInt(0, 100);    // Instant.now() is to get the current time using Java 8    Instant now = Instant.now();    // we write the data to the json document    transaction.put("name", name);    transaction.put("amount", amount);    transaction.put("time", now.toString());    return new ProducerRecord<>("bank-transactions", name, transaction.toString());}我有两个关于分组和聚合的问题,一种。是否按groupByKey分组,Serdes.String()并且jsonSerde仅对 Steam 数据执行序列化和反序列化?该Serdes.String()是在名称字符串newRandomTransaction的方法。湾 我的断言是该行的key, transaction内部aggregation功能(key, transaction, balance) -> newBalance(transaction, balance) 是从bank-transactions主题中读取的,而该balance功能来自initialBalance上一行。那是对的吗?尽管它无缝运行,但我在尝试调试该应用程序时也感到困惑。
查看完整描述

1 回答

?
弑天下

TA贡献1818条经验 获得超8个赞

groupByKey 是否按 Serdes.String() 分组,而 jsonSerde 仅对 Steam 数据执行序列化和反序列化?

是的,groupByKey 是按键分组,这些键可以反序列化并作为字符串进行比较

我的断言是行 (key, transaction, balance) -> newBalance(transaction, balance) 的聚合函数中的关键,交易是从银行交易主题中读取的,余额来自上一行的 initialBalance

几乎。初始化器在第一个参数上,是的,但是聚合的结果在应用程序的整个执行过程中都被结转,无休止地聚合。

换句话说,您从initialBalancealways开始,然后对于每个相同的键,将其transaction余额添加到该balance键的当前累积值中。如果您还没有看到重复的密钥,那么它才会被添加到初始余额中

是的,您的输入主题是由 KStreamsbuilder.stream方法指定的


查看完整回答
反对 回复 2021-11-03
  • 1 回答
  • 0 关注
  • 188 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号