为了账号安全,请及时绑定邮箱和手机立即绑定

Kafka Streams 在生产主题时不会将偏移量增加 1

Kafka Streams 在生产主题时不会将偏移量增加 1

LEATH 2022-06-15 09:41:25
我已经实现了一个简单的 Kafka 死信记录处理器。使用控制台生产者生成的记录时,它可以完美运行。但是我发现我们的 Kafka Streams 应用程序不能保证为接收器主题生成记录,每生成一条记录,偏移量就会增加 1。死信处理器背景:我有一个场景,在发布处理它所需的所有数据之前可能会收到记录。当流应用程序不匹配记录以进行处理时,它们将移动到死信主题,而不是继续向下流。当新数据发布时,我们将来自死信主题的最新消息转储回流应用程序的源主题,以便使用新数据进行重新处理。死信处理器:在运行应用程序开始时记录每个分区的结束偏移量结束偏移量标记停止处理给定死信主题的记录的点,以避免在重新处理的记录返回死信主题时出现无限循环。应用程序从上次通过消费者组运行产生的最后一个偏移量恢复。应用程序正在使用事务并KafkaProducer#sendOffsetsToTransaction提交最后产生的偏移量。为了跟踪我的范围内的所有记录何时为主题的分区处理,我的服务将其最后产生的从生产者的偏移量与消费者保存的结束偏移量映射进行比较。当我们到达结束偏移量时,消费者通过暂停该分区KafkaConsumer#pause,当所有分区都暂停时(意味着它们达到保存的结束偏移量)然后调用它退出。Kafka 消费者 API状态:偏移量和消费者位置 Kafka 为分区中的每条记录维护一个数字偏移量。此偏移量充当该分区内记录的唯一标识符,并且还表示消费者在分区中的位置。例如,位置 5 的消费者已经消费了偏移量为 0 到 4 的记录,接下来将接收偏移量为 5 的记录。Kafka Producer API引用下一个偏移量也总是 +1。将指定偏移量列表发送给消费者组协调器,并将这些偏移量标记为当前事务的一部分。仅当事务成功提交时,这些偏移量才会被视为已提交。提交的偏移量应该是您的应用程序将使用的下一条消息,即 lastProcessedMessageOffset + 1。但是您可以在我的调试器中清楚地看到,单个分区消耗的记录不是一次递增 1...我想这可能是一个 Kafka 配置问题,max.message.bytes但没有一个真正有意义。然后我想也许是因为加入,但没有看到任何方式会改变制片人的运作方式。不确定它是否相关,但我们所有的 Kafka 应用程序都在使用 Avro 和 Schema Registry...无论生产方法如何,偏移量是否应该始终递增 1,或者使用 Kafka 流 API 是否可能无法提供与普通生产者消费者客户端相同的保证?有什么我完全想念的吗?
查看完整描述

2 回答

?
繁花不似锦

TA贡献1851条经验 获得超4个赞

消息偏移量增加一并不是官方的 API 约定,即使 JavaDocs 表明了这一点(似乎应该更新 JavaDocs)。

  • 如果你不使用事务,你要么得到至少一次语义,要么没有保证(有些人称之为最多一次语义)。对于至少一次,记录可能被写入两次,因此,两个连续消息的偏移量并没有真正增加一,因为重复写入“消耗”了两个偏移量。

  • 如果您使用事务,则事务的每次提交(或中止)都会将提交(或中止)标记写入主题 - 这些事务标记也“消耗”一个偏移量(这是您观察到的)。

因此,通常您不应依赖连续的偏移量。您得到的唯一保证是,每个偏移量在分区内都是唯一的。


查看完整回答
反对 回复 2022-06-15
?
万千封印

TA贡献1891条经验 获得超3个赞

我知道知道消息的偏移量可能很有用。但是,Kafka 只会保证 message-X 的偏移量大于最后一条消息(X-1)的偏移量。顺便说一句,理想的解决方案不应基于偏移计算。

在后台,kafka 生产者可能会尝试重新发送消息。此外,如果经纪人倒闭,则可能会发生重新平衡。恰好一次语义可能会附加一条附加消息。因此,如果发生上述任何事件,您的消息偏移量可能会发生变化。

Kafka 可能会出于内部目的向主题添加其他消息。但是 Kafka 的消费者 API 可能会丢弃这些内部消息。因此,您只能看到您的消息,并且您的消息的偏移量不一定会增加 1。


查看完整回答
反对 回复 2022-06-15
  • 2 回答
  • 0 关注
  • 176 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信