为了账号安全,请及时绑定邮箱和手机立即绑定

Kafka Consumer:如何以编程方式从 Go Sarama 中的特定偏移量消费

Kafka Consumer:如何以编程方式从 Go Sarama 中的特定偏移量消费

Go
RISEBY 2022-06-01 16:15:45
最近,我开始学习使用kafka工作。我正在从事的项目使用sarama。为了阅读我使用的消息ConsumerGroup。foo如果返回,我需要在一段时间后再次阅读该消息false。如何才能做到这一点?func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {    for message := range claim.Messages() {            if ok := foo(message); ok {                session.MarkMessage(message, "")            } else {                // ???            }    }    return nil}
查看完整描述

1 回答

?
白衣染霜花

TA贡献1796条经验 获得超10个赞

Setup()您可以通过在您的消费者组的回调中包含以下内容来将消费者组的偏移量重置为较旧的偏移量:

func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error {
    sess.ResetOffset(topic, partition, offset, "")
    return nil}

您也可以通过控制台实现相同的目的:

kafka-consumer-groups \
    --bootstrap-server localhost:9092 \
    --group my-consumer-group \
    --topic myTopicName \
    --reset-offsets \
    --to-offfset 100 \
    --execute


查看完整回答
反对 回复 2022-06-01
  • 1 回答
  • 0 关注
  • 140 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信