我在 python 中运行 confluent_kafka 客户端。目前,我在尝试生成然后消费消息时没有收到任何错误,但问题是生产者说它成功了,但消费者找不到任何消息。我创建了一个主题,这是我构建的正在使用的类:from confluent_kafka import Producer, Consumerfrom config import configimport jsonclass Kafka: """ Kafka Handler. """ def __init__(self, kafka_brokers_sasl, api_key): """ Arguments: kafka_brokers_sasl {str} -- String containing kafka brokers separated by comma (no spaces) api_key {str} -- Kafka Api Key """ self.driver_options = { 'bootstrap.servers': kafka_brokers_sasl, 'sasl.mechanisms': 'PLAIN', 'security.protocol': 'SASL_SSL', 'sasl.username': 'token', 'sasl.password': api_key, 'log.connection.close' : False, #'debug': 'all' } self.producer_options = { 'client.id': 'kafka-python-console-sample-producer' } self.producer_options.update(self.driver_options) self.consumer_options = { 'client.id': 'kafka-python-console-sample-consumer', 'group.id': 'kafka-python-console-sample-group' } self.consumer_options.update(self.driver_options) self.running = None def stop(self): self.running = False def delivery_report(self, err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) def produce(self, topic, data): # Function for producing/uploading data to a Kafka topic p = Producer(self.producer_options) print("Running?")
2 回答
扬帆大鱼
TA贡献1799条经验 获得超9个赞
我不是 Python 方面的专家,但看起来您是在生成消息后才开始使用的?
kafka.produce(config['kafka']['topic'], json.dumps(mock))
kafka.consume(config['kafka']['topic'])
您需要在调用生产函数之前调用消耗函数,因为当您启动一个新消费者时,该消费者的默认偏移量将是最新的。因此,例如,如果您在偏移量 5 处生成了一条消息,然后启动了一个新的消费者,则默认情况下,您的消费者偏移量将在偏移量 6 处,并且不会消耗在偏移量 5 处生成的消息。
解决方案是要么在产生任何东西之前开始消费,要么将消费者配置设置为从偏移量的开始消费消息。这可以通过设置auto.offset.reset
来完成,earliest
但我认为第一个解决方案更简单。
添加回答
举报
0/150
提交
取消