我是 Apache Kafka 的新手。我阅读了一个 Steam 应用程序的代码,偶然发现了聚合操作。我试图自己理解它,如果我的解释正确,我需要确认。下面提供了从主题和聚合中读取的代码片段,// json Serdefinal Serializer<JsonNode> jsonSerializer = new JsonSerializer();final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);KStreamBuilder builder = new KStreamBuilder();// read from the topic 'bank-transactions' as `KStream`. I provided the producer below KStream<String, JsonNode> bankTransactions = builder.stream(Serdes.String(), jsonSerde, "bank-transactions");// we define the grouping and aggregation here KTable<String, JsonNode> bankBalance = bankTransactions.groupByKey(Serdes.String(), jsonSerde) .aggregate( () -> initialBalance, (key, transaction, balance) -> newBalance(transaction, balance), jsonSerde, "bank-balance-agg" );bank-transactions主题的数据流产生如下,public static ProducerRecord<String, String> newRandomTransaction(String name) { // creates an empty json {} ObjectNode transaction = JsonNodeFactory.instance.objectNode(); Integer amount = ThreadLocalRandom.current().nextInt(0, 100); // Instant.now() is to get the current time using Java 8 Instant now = Instant.now(); // we write the data to the json document transaction.put("name", name); transaction.put("amount", amount); transaction.put("time", now.toString()); return new ProducerRecord<>("bank-transactions", name, transaction.toString());}我有两个关于分组和聚合的问题,一种。是否按groupByKey分组,Serdes.String()并且jsonSerde仅对 Steam 数据执行序列化和反序列化?该Serdes.String()是在名称字符串newRandomTransaction的方法。湾 我的断言是该行的key, transaction内部aggregation功能(key, transaction, balance) -> newBalance(transaction, balance) 是从bank-transactions主题中读取的,而该balance功能来自initialBalance上一行。那是对的吗?尽管它无缝运行,但我在尝试调试该应用程序时也感到困惑。
1 回答

弑天下
TA贡献1818条经验 获得超8个赞
groupByKey 是否按 Serdes.String() 分组,而 jsonSerde 仅对 Steam 数据执行序列化和反序列化?
是的,groupByKey 是按键分组,这些键可以反序列化并作为字符串进行比较
我的断言是行 (key, transaction, balance) -> newBalance(transaction, balance) 的聚合函数中的关键,交易是从银行交易主题中读取的,余额来自上一行的 initialBalance
几乎。初始化器在第一个参数上,是的,但是聚合的结果在应用程序的整个执行过程中都被结转,无休止地聚合。
换句话说,您从initialBalance
always开始,然后对于每个相同的键,将其transaction
余额添加到该balance
键的当前累积值中。如果您还没有看到重复的密钥,那么它才会被添加到初始余额中
是的,您的输入主题是由 KStreamsbuilder.stream
方法指定的
添加回答
举报
0/150
提交
取消