我有一个自定义的Kafka Consumer,用于将一些请求发送到REST API。根据API的响应,我要么提交偏移量,要么跳过不提交的消息。最小示例:while (true) { ConsumerRecords<String, Object> records = consumer.poll(200); for (ConsumerRecord<String, Object> record : records) { // Sending a POST request and retrieving the answer // ... if (responseCode.startsWith("2")) { try { consumer.commitSync(); } catch(CommitFailedException ex) { ex.printStackTrace(); } } else { // Do Nothing } }}现在,当来自REST API的响应未以开头时,2不会提交偏移量,但是不会重新使用该消息。如何强制使用者重新使用未提交的偏移量的邮件?
2 回答
![?](http://img1.sycdn.imooc.com/5c4aa098000126bb09600960-100-100.jpg)
繁星coding
TA贡献1797条经验 获得超4个赞
提交偏移量只是存储使用者当前偏移量(也称为位置)的一种方法。因此,如果它停止了,它(或新的使用者实例将接管)可以找到其先前的位置并从那里重新开始消费。
因此,即使您不提交,一旦收到记录,使用者的位置也会被移动。如果要重新使用某些记录,则必须更改使用者的当前位置。
在Java客户端中,您可以使用来设置位置seek()
。
在您的方案中,您可能想计算相对于当前位置的新位置。如果是这样,您可以使用查找当前位置position()
。
添加回答
举报
0/150
提交
取消