为了账号安全,请及时绑定邮箱和手机立即绑定

分别运行具有不同主题的 2 个消费者时出现 Kafka CommitFailedException

分别运行具有不同主题的 2 个消费者时出现 Kafka CommitFailedException

汪汪一只猫 2023-07-19 16:23:44
我正在尝试运行 2 个订阅了 2 个不同主题的消费者。两个消费者程序每次运行一个时都运行正常,但同时运行时,其中一个消费者总是显示异常:org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.我遵循了建议max.pool.size,将 2设置为session.timeout.ms30000,1000heartbeat.interval.ms下面是我的消费者函数,这两个文件的函数是相同的,只是主题名称更改为Test2,并且我在同时运行的 2 个不同类中运行这两个函数。    public void consume()    {        //Kafka consumer configuration settings        List<String> topicNames = new ArrayList<String>();        topicNames.add("Test1");        Properties props = new Properties();        props.put("bootstrap.servers", "localhost:9092");        props.put("group.id", "test");        props.put("enable.auto.commit", "false");        props.put("session.timeout.ms", "30000");        props.put("heartbeat.interval.ms", "1000");        props.put("max.poll.records", "2");        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);        consumer.subscribe(topicNames);由于此错误,记录不会在Kafka主题中提交。我该如何克服这个错误?
查看完整描述

1 回答

?
倚天杖

TA贡献1828条经验 获得超3个赞

在您的情况下,您需要为消费者分配不同的组 ID。您正在使用相同的组 ID 创建两个消费者(这是可以的),但是调用 subscribe 两次是不行的

您可以一次运行一个消费者,因为您只调用 subscribe 一次。

如果您需要任何进一步的帮助,请告诉我。很高兴能帮助你。


查看完整回答
反对 回复 2023-07-19
  • 1 回答
  • 0 关注
  • 97 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信