当有多个消费者时,我无法收听 kafka 主题(我的案例 2 主题)。在下面的示例中,我有 2 个消费者工厂,它们将接收 2 个不同的 JSON 消息(一个是用户类型,另一个是事件类型)。两条消息都发布到不同的主题。在这里,当我尝试从 topic1 访问事件消息时,我无法访问,但我可以访问用户主题消息。前任:@Configuration@EnableKafkapublic class KafkaConsumerConfiguration { @Autowiredprivate Environment environment;@Beanpublic ConsumerFactory<String,User> consumerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers")); config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("user.consumer.group")); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class); return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(User.class));}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory;}@Beanpublic ConsumerFactory<String , Event> consumerFactoryEvent(){ Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers")); config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("event.consumer.group")); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class); return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(Event.class));}我的需要是首先侦听事件主题并对消息进行一些按摩,然后将其发送到用户主题,我有另一种方法可以侦听用户主题并对该消息执行某些操作。我尝试将不同的选项传递给@KafkaListener 如@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent")但它不起作用..我不确定出了什么问题..任何建议都有帮助!
3 回答
![?](http://img1.sycdn.imooc.com/5458655200013d9802200220-100-100.jpg)
料青山看我应如是
TA贡献1772条经验 获得超8个赞
如果你没有在 bean 中指定名称,那么方法名称将是 bean 名称,添加带有 groupid 的 bean 名称 @KafkaListener
@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent", groupId="")
@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactory", groupId="")
或者
指定名称@Bean并将该名称添加到@kafkaListener
@Bean(name="kafkaListenerContainerFactoryEvent")
public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactoryEvent() {
ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryEvent());
return factory;
}
添加回答
举报
0/150
提交
取消