2 回答
TA贡献1811条经验 获得超4个赞
就像 Lior Chaga 在他的评论中所说的那样,您正在手动将主题分区分配给您的消费者。这不是执行此操作的推荐方法。最重要的是,您的所有消费者似乎都在使用完全相同的 groupID。利用这种结构,有两个线程消费,如果消费者的至少一个有一个特定的消息,没有其他线程会得到一个。如果您希望所有的消费者线程都获得自己的“一组”消息,而不会相互中断,那么您需要给它们不同的group.ids。
要订阅主题以便它为您处理自动重新平衡,然后消费,您应该执行以下操作(取自下面链接的 KafkaConsumer javadoc):
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Kafka 官方 javadocs 有更详细的解释:https ://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
添加回答
举报