2 回答
TA贡献1836条经验 获得超4个赞
我想你快到了。首先收集您感兴趣的所有主题分区,然后发出consumer.endOffsets命令。
请记住,我还没有尝试运行它,但是这样的事情应该可以工作:
run() {
Consumer consumer = createConsumer();
List<String> groupIds = getConsumerGroups();
List<TopicPartition> topicPartitions = new ArrayList<>();
for (String groupId: groupIds) {
topicPartitions.addAll(getTopicPartitions(groupId));
}
consumer.endOffsets(topicPartitions);
}
TA贡献1775条经验 获得超11个赞
这是一个Fetcher.fetchOffsetsByTimes()
特别在groupListOffsetRequests
方法内部的错误,其中逻辑没有添加用于重试的分区,其中用于请求分区偏移量的领导者未知或不可用。
当您在所有消费者组分区中使用单个消费者时,这一点更加明显,其中一些组在我们请求时已经拥有主题分区领导信息,endoffsets
而对于没有领导信息的主题分区,未知或不可用的主题分区由于错误而被忽略.
后来,我意识到从每个消费者组中提取主题分区并不是一个好主意,而是进行更改以从中读取主题分区AdminClient.listTopics & AdminClient.describeTopics
并一次性传递给Consumer.endOffsets
.
尽管这完全不能解决问题,因为主题/分区在多次运行之间可能仍然不可用或未知。
可以找到更多信息 - KAFKA-7044
& pull request
。这已被修复并计划在 2.1.0 版本中发布。
添加回答
举报