2 回答
TA贡献1841条经验 获得超3个赞
作为参考,这是有效的解决方案:
consumer = Consumer({
'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.CONSUMER_GROUP,
'enable.auto.commit': False,
})
# get all topics
topics = consumer.list_topics()
# get all partitions
partitions = []
for name, meta in topics.topics.items():
for partition_id in meta.partitions.keys():
part = TopicPartition(name, partition_id)
partitions.append(part)
# get last committed offsets
partitions = consumer.committed(partitions)
显然consumer.position不像宣传的那样工作,但consumer.committed会返回存储的偏移量,即使消费者当前没有订阅主题/分区。
TA贡献1848条经验 获得超2个赞
尝试添加consumer.subscribe()或consumer.assign()功能
consumer = Consumer({
'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.CONSUMER_GROUP,
'enable.auto.commit': False,
})
# get all partitions
partitions = []
for name, meta in topics.topics.items():
for partition_id in meta.partitions.keys():
part = TopicPartition(name, partition_id)
partitions.append(part)
consumer.assign(partitions)
committed = consumer.committed(tp)
last_offset = consumer.position(tp)
print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset))
添加回答
举报