为了账号安全,请及时绑定邮箱和手机立即绑定

Consumer.endOffsets 在 Kafka 中是如何工作的?

Consumer.endOffsets 在 Kafka 中是如何工作的?

繁花如伊 2021-11-24 15:28:43
假设我有一个无限期运行的计时器任务,它遍历 kafka 集群中的所有消费者组,并为每个组的所有分区输出滞后、提交偏移和结束偏移。类似于 Kafka 控制台消费者组脚本的工作方式,但它适用于所有组。就像是单个消费者 - 不工作 - 不返回某些提供的主题分区的偏移量(例如提供 10 个 - 返回 5 个偏移量)Consumer consumer;static {  consumer = createConsumer();}run() {   List<String> groupIds = getConsumerGroups();  for(String groupId: groupIds) {       List<TopicParition> topicParitions =  getTopicParitions(groupId);       consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)   }}多个消费者 - 工作run() {    List<String> groupIds = getConsumerGroups();   for(String groupId: groupIds) {        List<TopicParition> topicParitions =  getTopicParitions(groupId);        Consumer consumer = createConsumer();        consumer.endOffsets(topicParitions); This works!!!   } }版本:Kafka-Client 2.0.0我是否错误地使用了消费者 API?理想情况下,我想使用单个消费者。如果您需要更多详细信息,请告诉我。
查看完整描述

2 回答

?
HUH函数

TA贡献1836条经验 获得超4个赞

我想你快到了。首先收集您感兴趣的所有主题分区,然后发出consumer.endOffsets命令。


请记住,我还没有尝试运行它,但是这样的事情应该可以工作:


run() { 

   Consumer consumer = createConsumer();

   List<String> groupIds = getConsumerGroups();

   List<TopicPartition> topicPartitions = new ArrayList<>();


   for (String groupId: groupIds) {

        topicPartitions.addAll(getTopicPartitions(groupId));

   }


   consumer.endOffsets(topicPartitions); 

}


查看完整回答
反对 回复 2021-11-24
?
繁星淼淼

TA贡献1775条经验 获得超11个赞

这是一个Fetcher.fetchOffsetsByTimes()特别在groupListOffsetRequests方法内部的错误,其中逻辑没有添加用于重试的分区,其中用于请求分区偏移量的领导者未知或不可用。

当您在所有消费者组分区中使用单个消费者时,这一点更加明显,其中一些组在我们请求时已经拥有主题分区领导信息,endoffsets而对于没有领导信息的主题分区,未知或不可用的主题分区由于错误而被忽略.

后来,我意识到从每个消费者组中提取主题分区并不是一个好主意,而是进行更改以从中读取主题分区AdminClient.listTopics & AdminClient.describeTopics并一次性传递给Consumer.endOffsets.

尽管这完全不能解决问题,因为主题/分区在多次运行之间可能仍然不可用或未知。

可以找到更多信息 - KAFKA-7044pull request。这已被修复并计划在 2.1.0 版本中发布。


查看完整回答
反对 回复 2021-11-24
  • 2 回答
  • 0 关注
  • 499 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信