为了账号安全,请及时绑定邮箱和手机立即绑定

Kafka 2 消费者工厂监听器不是一直连接

Kafka 2 消费者工厂监听器不是一直连接

胡子哥哥 2021-11-11 16:00:00
我们在我们的项目中使用 Spring Kafka 2.1.4.RELEASE 版本,我们有以下配置:@EnableKafkapublic class KafkaConfig {    @Value("${spring.kafka.bootstrap-servers}")    private String bootstrapServers;    @Configuration    class ProducerConfig {        @Bean        public Map<String, Object> producerConfigs() {            Map<String, Object> props = new HashMap<>();            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ASerializer.class);            return props;        }        @Bean        public ProducerFactory<String, A> producerFactory() {            return new DefaultKafkaProducerFactory<>(producerConfigs());        }        @Bean        public KafkaTemplate<String, A> kafkaTemplate() {            return new KafkaTemplate<>(producerFactory());        }    }    @Configuration    class ConsumerConfig {        @Value("${spring.kafka.consumer.group-id}")        private String groupId;        @Value("${spring.kafka.consumer.auto-offset-reset}")        private String autoOffsetReset;        @Value("${spring.kafka.consumer.enable-auto-commit}")        private boolean enableAutoCommit;        @Value("${spring.kafka.consumer.max-poll-records}")        private Integer maxPollRecords;        @Bean        public Map<String, Object> firstConsumerConfig() {            Map<String, Object> props = getCommonConsumerConfig();            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ADeserializer.class);            return props;        }所以我们在启动这个应用程序时注意到它并不是一直连接到这两个主题。有时它仅连接到第二个主题或仅连接到第一个主题,并且可能连接到第一个和第二个主题(这是正确的)。那么你能帮助理解这里配置错误吗?
查看完整描述

2 回答

?
繁花如伊

TA贡献2012条经验 获得超12个赞

通常最佳做法是将每个侦听器放在不同的位置group.id(您可以使用覆盖消费者工厂的groupId属性@KafkaListener)。否则,当第二个开始时,第一个会导致重新平衡。当前的 2.1.x 版本是 2.1.10。


查看完整回答
反对 回复 2021-11-11
?
互换的青春

TA贡献1797条经验 获得超6个赞

好的,经过更多调查后,我能够确定我这边发生了什么样的问题。所以基本上我们有一个包含多个主题的消费者组。因此,在我的情况下,我们为每个主题设置了 0 个分区(据我所知,没有分区,我们使用主题的 1 个队列进行操作)。因此,当我连接到该 kafka 实例时 - 所有消费者都连接到这些主题,但是当有人也连接到该主题(可能是我的同事)时,正在发生重新平衡,他开始听这些主题之一而不是我(由于事实上每个分区只能有一个使用者)。


查看完整回答
反对 回复 2021-11-11
  • 2 回答
  • 0 关注
  • 169 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信