为了账号安全,请及时绑定邮箱和手机立即绑定

如何使用kafka-python计算主题中的记录(消息)数量

如何使用kafka-python计算主题中的记录(消息)数量

杨魅力 2023-10-18 21:40:47
正如标题中所说,我想在我的主题中获得一些记录,但我找不到使用 kafka-python 库的解决方案。有人有什么主意吗 ?
查看完整描述

4 回答

?
慕哥9229398

TA贡献1877条经验 获得超6个赞

主要思想是计算主题的每个分区中有多少条消息并对所有这些数字求和。结果是有关该主题的消息总数。我使用confluence_kafka作为主库。


from confluent_kafka import Consumer, TopicPartition

from concurrent.futures import ThreadPoolExecutor


consumer = Consumer({"bootstrap.servers": "localhost:6667", "group.id": "test"})


def get_partition_size(topic_name: str, partition_key: int):

    topic_partition = TopicPartition(topic_name, partition_key)

    low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)

    partition_size = high_offset - low_offset

    return partition_size


def get_topic_size(topic_name: str):

    topic = consumer.list_topics(topic=topic_name)

    partitions = topic.topics[topic_name].partitions

    workers, max_workers = [], len(partitions) or 1


    with ThreadPoolExecutor(max_workers=max_workers) as e:

        for partition_key in list(topic.topics[topic_name].partitions.keys()):

            job = e.submit(get_partition_size, topic_name, partition_key)

            workers.append(job)


    topic_size = sum([w.result() for w in workers])

    return topic_size


print(get_topic_size('my.kafka.topic'))


查看完整回答
反对 回复 2023-10-18
?
大话西游666

TA贡献1817条经验 获得超14个赞

一种解决方案是您可以向所有分区各添加一条消息并获取最后的偏移量。根据偏移量,您可以计算到目前为止发送到主题的消息总数。

但这不是正确的做法。你不知道消费者已经消费了多少条消息,以及kafka删除了多少条消息。唯一的方法是您可以消费消息并计算数量。


查看完整回答
反对 回复 2023-10-18
?
慕虎7371278

TA贡献1802条经验 获得超4个赞

没有特定的 API 来计算某个主题的记录数。您需要消费并计算从 kafka 消费者收到的记录数。



查看完整回答
反对 回复 2023-10-18
?
富国沪深

TA贡献1790条经验 获得超9个赞

我无法使用 来实现此操作kafka-python,但我可以使用confluent-kafka库相当轻松地完成此操作:


from confluent_kafka import Consumer


topic = "test_topic"

broker = "localhost:9092"


def get_count():

    consumer = Consumer({

        'bootstrap.servers': broker,

        'group.id': 'my-group',

        'auto.offset.reset': 'earliest',

    })


    consumer.subscribe([topic])


    total_message_count = 0

    while True:

        msg = consumer.poll(1.0)


        if msg is None:

            print("No more messages")

            break

        if msg.error():

            print("Consumer error: {}".format(msg.error()))

            continue


        total_message_count = total_message_count + 1

        print('Received message {}: {}'.format(total_message_count,     

msg.value().decode('utf-8')))


    consumer.close()


    print(total_message_count)


查看完整回答
反对 回复 2023-10-18
  • 4 回答
  • 0 关注
  • 172 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号