我想了解我将消费者配置为不自动提交的 kafkaConsumer.poll() 方法的行为 Properties properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrapAddress);
KafkaConsumer consumer = new KafkaConsumer(properties);据我了解,根据 Javadoc,如果我这样做 ConsumerRecords firstBatch = consumer.poll(0l);
ConsumerRecords secondBatch = consumer.poll(0l);假设主题中只有一个分区,因为尚未提交偏移量,则firstBatch和都应secondBatch包含相同的。ConsumerRecords我的假设正确吗?我的问题是,每次我调用consumer.poll(0l)下一批时ConsumerRecords都会获取
1 回答
慕慕森
TA贡献1856条经验 获得超17个赞
firstBatch 和 secondaryBatch 都应包含相同的 ConsumerRecords
offset
这是错误的,即使禁用自动或offset
手动提交,Kafka 消费者偏移量也会在每次后续轮询中自动增加
抵消和消费者地位
消费者的位置给出了将给出的下一条记录的偏移量。它将比消费者在该分区中看到的最高偏移量大 1。每次消费者在调用 poll(long) 中收到消息时,它都会自动前进
提交的位置是已安全存储的最后一个偏移量。如果进程失败并重新启动,这就是消费者将恢复到的偏移量。消费者可以定期自动提交偏移量;或者它可以选择通过调用提交 API 之一(例如 commitSync 和 commitAsync)来手动控制此提交位置。
你的假设以另一种方式是正确的,当offset
未提交并且kafka消费者重新启动时,它将轮询旧批次或从提交旧偏移量的开头开始。
添加回答
举报
0/150
提交
取消