4 回答

TA贡献1877条经验 获得超6个赞
主要思想是计算主题的每个分区中有多少条消息并对所有这些数字求和。结果是有关该主题的消息总数。我使用confluence_kafka作为主库。
from confluent_kafka import Consumer, TopicPartition
from concurrent.futures import ThreadPoolExecutor
consumer = Consumer({"bootstrap.servers": "localhost:6667", "group.id": "test"})
def get_partition_size(topic_name: str, partition_key: int):
topic_partition = TopicPartition(topic_name, partition_key)
low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)
partition_size = high_offset - low_offset
return partition_size
def get_topic_size(topic_name: str):
topic = consumer.list_topics(topic=topic_name)
partitions = topic.topics[topic_name].partitions
workers, max_workers = [], len(partitions) or 1
with ThreadPoolExecutor(max_workers=max_workers) as e:
for partition_key in list(topic.topics[topic_name].partitions.keys()):
job = e.submit(get_partition_size, topic_name, partition_key)
workers.append(job)
topic_size = sum([w.result() for w in workers])
return topic_size
print(get_topic_size('my.kafka.topic'))

TA贡献1817条经验 获得超14个赞
一种解决方案是您可以向所有分区各添加一条消息并获取最后的偏移量。根据偏移量,您可以计算到目前为止发送到主题的消息总数。
但这不是正确的做法。你不知道消费者已经消费了多少条消息,以及kafka删除了多少条消息。唯一的方法是您可以消费消息并计算数量。

TA贡献1790条经验 获得超9个赞
我无法使用 来实现此操作kafka-python,但我可以使用confluent-kafka库相当轻松地完成此操作:
from confluent_kafka import Consumer
topic = "test_topic"
broker = "localhost:9092"
def get_count():
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'my-group',
'auto.offset.reset': 'earliest',
})
consumer.subscribe([topic])
total_message_count = 0
while True:
msg = consumer.poll(1.0)
if msg is None:
print("No more messages")
break
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
total_message_count = total_message_count + 1
print('Received message {}: {}'.format(total_message_count,
msg.value().decode('utf-8')))
consumer.close()
print(total_message_count)
添加回答
举报