我使用此配置创建一个新消费者:c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": addresses, "group.id": "my_group", "auto.offset.reset": "earliest", })topic := "testTopic"if err = c.SubscribeTopics([]string{topic}, nil); err != nil { panic(err)}然后我根据以下代码生成事件并使用一个事件:events := []map[string]string{{ "name": "Foo",}, { "name": "Bar", }, } err = p.ProduceEvent(events[0])//there is a wrapper to produce events err = p.ProduceEvent(events[1]) res, err := c.ReadMessage(100 * time.Second) time.Sleep(20 * time.Second) c.Close() 当我用 描述该组时 watch /home/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_group --describe。每一步的结果是:产生事件后: 当我消费一个事件时: 关闭消费者后: 我不明白为什么最后滞后为零!我只消耗了一个事件。这对我来说很奇怪,那Close会改变偏移量。任何线索表示赞赏。
1 回答
拉风的咖菲猫
TA贡献1995条经验 获得超2个赞
ReadMessage
包裹Poll
。Poll
获取一批消息并在本地缓冲它们。由于您已将消费者配置为自动提交偏移量,因此它将提交所有获取的消息,甚至是那些在本地缓存且您的应用程序仍未处理的消息。这就是为什么您看到关闭消费者后没有延迟。
librdkafka
(因此confluent-kafka-go
)没有办法配置max.pool.records
,所以如果你想准确控制哪些偏移量被提交,你需要禁用自动提交偏移量并使用手动提交它们StoreOffsets
:https ://github.com/confluentinc/confluent- kafka-go/issues/380#issuecomment-539903016
- 1 回答
- 0 关注
- 231 浏览
添加回答
举报
0/150
提交
取消