为了账号安全,请及时绑定邮箱和手机立即绑定

具有相同组 ID 的 Kafka 消费者线程使用相同的记录

具有相同组 ID 的 Kafka 消费者线程使用相同的记录

繁花不似锦 2021-11-17 15:26:19
我需要在多个线程中使用来自 Kafka 分区的记录,每个线程上都有唯一的记录进行处理。我有以下代码,我不知道是什么错误public class ConsumerThread implements Runnable {    public String name;    public ConsumerThread(String name){        this.name = name;    }    public Properties getDefaultProperty(){        Properties prop = new Properties();        prop.setProperty("group.id", "4");        prop.put("enable.auto.commit", "false");        prop.put("auto.offset.reset", "earliest");        prop.setProperty("bootstrap.servers", "localhost:9092");        prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        prop.setProperty("max.poll.records","150");        return prop;    }    public void run() {        TopicPartition tp = new TopicPartition("my.topic", 0);        KafkaConsumer consumer = new KafkaConsumer(getDefaultProperty());        ArrayList tpList = new ArrayList<TopicPartition>();        tpList.add(tp);        consumer.assign(tpList);        ConsumerRecords poll = consumer.poll(1000);        Iterator it = poll.iterator();        consumer.commitAsync();        while(it.hasNext()){            ConsumerRecord cr = (ConsumerRecord) it.next();            System.out.println("From "+this.name+" : "+cr.value());        }        consumer.close();        System.out.println("Thread Exiting "+this.name);    }}结果From Thread1 : produced_0From Thread1 : produced_1From Thread1 : produced_2From Thread1 : produced_3...From Thread1 : produced_136From Thread2 : produced_0From Thread2 : produced_1From Thread2 : produced_2From Thread2 : produced_3...预期的 :From Thread1 : produced_0From Thread1 : produced_1From Thread1 : produced_2From Thread1 : produced_3...From Thread1 : produced_136From Thread2 : produced_4From Thread2 : produced_5From Thread2 : produced_6From Thread2 : produced_137
查看完整描述

2 回答

?
波斯汪

TA贡献1811条经验 获得超4个赞

就像 Lior Chaga 在他的评论中所说的那样,您正在手动将主题分区分配给您的消费者。这不是执行此操作的推荐方法。最重要的是,您的所有消费者似乎都在使用完全相同的 groupID。利用这种结构,有两个线程消费,如果消费者的至少一个有一个特定的消息,没有其他线程会得到一个。如果您希望所有的消费者线程都获得自己的“一组”消息,而不会相互中断,那么您需要给它们不同的group.ids。


要订阅主题以便它为您处理自动重新平衡,然后消费,您应该执行以下操作(取自下面链接的 KafkaConsumer javadoc):


 consumer.subscribe(Arrays.asList("foo", "bar"));

 while (true) {

     ConsumerRecords<String, String> records = consumer.poll(100);

     for (ConsumerRecord<String, String> record : records)

         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

 }

Kafka 官方 javadocs 有更详细的解释:https ://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html


查看完整回答
反对 回复 2021-11-17
?
大话西游666

TA贡献1817条经验 获得超14个赞

只有使用kafka 消费者的subscribe方法才能将分区自动分配给消费者组。但是,您使用assign特定主题分区,因此您承担将特定分区分配给不同消费者的责任(但您始终使用相同的分区0,因此所有消费者都从同一主题分区消费)。


查看完整回答
反对 回复 2021-11-17
  • 2 回答
  • 0 关注
  • 259 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信