最近,我开始学习使用kafka工作。我正在从事的项目使用sarama。为了阅读我使用的消息ConsumerGroup。foo如果返回,我需要在一段时间后再次阅读该消息false。如何才能做到这一点?func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { if ok := foo(message); ok { session.MarkMessage(message, "") } else { // ??? } } return nil}
1 回答
白衣染霜花
TA贡献1796条经验 获得超10个赞
Setup()
您可以通过在您的消费者组的回调中包含以下内容来将消费者组的偏移量重置为较旧的偏移量:
func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error { sess.ResetOffset(topic, partition, offset, "") return nil}
您也可以通过控制台实现相同的目的:
kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --group my-consumer-group \ --topic myTopicName \ --reset-offsets \ --to-offfset 100 \ --execute
- 1 回答
- 0 关注
- 140 浏览
添加回答
举报
0/150
提交
取消