我们在我们的项目中使用 Spring Kafka 2.1.4.RELEASE 版本,我们有以下配置:@EnableKafkapublic class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Configuration class ProducerConfig { @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ASerializer.class); return props; } @Bean public ProducerFactory<String, A> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, A> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } } @Configuration class ConsumerConfig { @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${spring.kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${spring.kafka.consumer.max-poll-records}") private Integer maxPollRecords; @Bean public Map<String, Object> firstConsumerConfig() { Map<String, Object> props = getCommonConsumerConfig(); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ADeserializer.class); return props; }所以我们在启动这个应用程序时注意到它并不是一直连接到这两个主题。有时它仅连接到第二个主题或仅连接到第一个主题,并且可能连接到第一个和第二个主题(这是正确的)。那么你能帮助理解这里配置错误吗?
2 回答
繁花如伊
TA贡献2012条经验 获得超12个赞
通常最佳做法是将每个侦听器放在不同的位置group.id
(您可以使用覆盖消费者工厂的groupId
属性@KafkaListener
)。否则,当第二个开始时,第一个会导致重新平衡。当前的 2.1.x 版本是 2.1.10。
互换的青春
TA贡献1797条经验 获得超6个赞
好的,经过更多调查后,我能够确定我这边发生了什么样的问题。所以基本上我们有一个包含多个主题的消费者组。因此,在我的情况下,我们为每个主题设置了 0 个分区(据我所知,没有分区,我们使用主题的 1 个队列进行操作)。因此,当我连接到该 kafka 实例时 - 所有消费者都连接到这些主题,但是当有人也连接到该主题(可能是我的同事)时,正在发生重新平衡,他开始听这些主题之一而不是我(由于事实上每个分区只能有一个使用者)。
添加回答
举报
0/150
提交
取消