为了账号安全,请及时绑定邮箱和手机立即绑定

如何为我的消费者获取所有分区的当前偏移量?

如何为我的消费者获取所有分区的当前偏移量?

互换的青春 2022-05-24 15:30:44
我正在尝试获取每个可用分区的当前偏移量。根据文档,consumer.position应该可以解决问题,所以我这样尝试:consumer = Consumer({    'bootstrap.servers': config.BOOTSTRAP_SERVERS,    'group.id': config.CONSUMER_GROUP,    'enable.auto.commit': False,})# get all topicstopics = consumer.list_topics()# get all partitionspartitions = []for name, meta  in topics.topics.items():    for partition_id in meta.partitions.keys():        part = TopicPartition(name, partition_id)        partitions.append(part)# get all offsetsx = consumer.position(partitions)但是,结果分区中的所有偏移量x仍然是-1001.如果我使用镜头或其他工具进行检查,我可以看到这个结果不正确,我正在取消的消费者组已经消费了消息并将它们提交给 Kafka。
查看完整描述

2 回答

?
偶然的你

TA贡献1841条经验 获得超3个赞

作为参考,这是有效的解决方案:


consumer = Consumer({

    'bootstrap.servers': config.BOOTSTRAP_SERVERS,

    'group.id': config.CONSUMER_GROUP,

    'enable.auto.commit': False,

})


# get all topics

topics = consumer.list_topics()


# get all partitions

partitions = []

for name, meta  in topics.topics.items():

    for partition_id in meta.partitions.keys():

        part = TopicPartition(name, partition_id)

        partitions.append(part)


# get last committed offsets

partitions = consumer.committed(partitions)

显然consumer.position不像宣传的那样工作,但consumer.committed会返回存储的偏移量,即使消费者当前没有订阅主题/分区。


查看完整回答
反对 回复 2022-05-24
?
慕尼黑5688855

TA贡献1848条经验 获得超2个赞

尝试添加consumer.subscribe()或consumer.assign()功能


consumer = Consumer({

    'bootstrap.servers': config.BOOTSTRAP_SERVERS,

    'group.id': config.CONSUMER_GROUP,

    'enable.auto.commit': False,

})


# get all partitions

partitions = []

for name, meta  in topics.topics.items():

    for partition_id in meta.partitions.keys():

        part = TopicPartition(name, partition_id)

        partitions.append(part)


consumer.assign(partitions)

committed = consumer.committed(tp)


last_offset = consumer.position(tp)

print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset))



查看完整回答
反对 回复 2022-05-24
  • 2 回答
  • 0 关注
  • 211 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信