为了账号安全,请及时绑定邮箱和手机立即绑定

重新使用未提交偏移量的消息

重新使用未提交偏移量的消息

侃侃无极 2021-04-08 18:19:42
我有一个自定义的Kafka Consumer,用于将一些请求发送到REST API。根据API的响应,我要么提交偏移量,要么跳过不提交的消息。最小示例:while (true) {    ConsumerRecords<String, Object> records = consumer.poll(200);    for (ConsumerRecord<String, Object> record : records) {        // Sending a POST request and retrieving the answer        // ...        if (responseCode.startsWith("2")) {            try {                consumer.commitSync();            } catch(CommitFailedException ex) {              ex.printStackTrace();             }        } else {              // Do Nothing        }    }}现在,当来自REST API的响应未以开头时,2不会提交偏移量,但是不会重新使用该消息。如何强制使用者重新使用未提交的偏移量的邮件?
查看完整描述

2 回答

?
繁星coding

TA贡献1797条经验 获得超4个赞

提交偏移量只是存储使用者当前偏移量(也称为位置)的一种方法。因此,如果它停止了,它(或新的使用者实例将接管)可以找到其先前的位置并从那里重新开始消费。

因此,即使您不提交,一旦收到记录,使用者的位置也会被移动。如果要重新使用某些记录,则必须更改使用者的当前位置。

在Java客户端中,您可以使用来设置位置seek()

在您的方案中,您可能想计算相对于当前位置的新位置。如果是这样,您可以使用查找当前位置position()


查看完整回答
反对 回复 2021-04-28
  • 2 回答
  • 0 关注
  • 158 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信