为了账号安全,请及时绑定邮箱和手机立即绑定

Python Kafka 客户端 - 没有错误但无法正常工作

Python Kafka 客户端 - 没有错误但无法正常工作

桃花长相依 2021-07-09 14:48:42
我在 python 中运行 confluent_kafka 客户端。目前,我在尝试生成然后消费消息时没有收到任何错误,但问题是生产者说它成功了,但消费者找不到任何消息。我创建了一个主题,这是我构建的正在使用的类:from confluent_kafka import Producer, Consumerfrom config import configimport jsonclass Kafka:    """    Kafka Handler.    """    def __init__(self, kafka_brokers_sasl, api_key):        """        Arguments:            kafka_brokers_sasl {str} -- String containing kafka brokers separated by comma (no spaces)            api_key {str} -- Kafka Api Key        """        self.driver_options = {            'bootstrap.servers': kafka_brokers_sasl,            'sasl.mechanisms': 'PLAIN',            'security.protocol': 'SASL_SSL',            'sasl.username': 'token',            'sasl.password': api_key,            'log.connection.close' : False,            #'debug': 'all'        }        self.producer_options = {            'client.id': 'kafka-python-console-sample-producer'        }        self.producer_options.update(self.driver_options)        self.consumer_options = {            'client.id': 'kafka-python-console-sample-consumer',            'group.id': 'kafka-python-console-sample-group'        }        self.consumer_options.update(self.driver_options)        self.running = None    def stop(self):        self.running = False    def delivery_report(self, err, msg):        """ Called once for each message produced to indicate delivery result.            Triggered by poll() or flush(). """        if err is not None:            print('Message delivery failed: {}'.format(err))        else:            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))    def produce(self, topic, data): # Function for producing/uploading data to a Kafka topic        p = Producer(self.producer_options)        print("Running?")
查看完整描述

2 回答

?
扬帆大鱼

TA贡献1799条经验 获得超9个赞

我不是 Python 方面的专家,但看起来您是在生成消息后才开始使用的?

kafka.produce(config['kafka']['topic'], json.dumps(mock)) kafka.consume(config['kafka']['topic'])

您需要在调用生产函数之前调用消耗函数,因为当您启动一个新消费者时,该消费者的默认偏移量将是最新的。因此,例如,如果您在偏移量 5 处生成了一条消息,然后启动了一个新的消费者,则默认情况下,您的消费者偏移量将在偏移量 6 处,并且不会消耗在偏移量 5 处生成的消息。

解决方案是要么在产生任何东西之前开始消费,要么将消费者配置设置为从偏移量的开始消费消息。这可以通过设置auto.offset.reset来完成,earliest但我认为第一个解决方案更简单。


查看完整回答
反对 回复 2021-07-13
  • 2 回答
  • 0 关注
  • 477 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信