为了账号安全,请及时绑定邮箱和手机立即绑定

多个消费者如何在spring boot Kafka中收听多个主题?

多个消费者如何在spring boot Kafka中收听多个主题?

拉丁的传说 2021-07-09 14:44:32
当有多个消费者时,我无法收听 kafka 主题(我的案例 2 主题)。在下面的示例中,我有 2 个消费者工厂,它们将接收 2 个不同的 JSON 消息(一个是用户类型,另一个是事件类型)。两条消息都发布到不同的主题。在这里,当我尝试从 topic1 访问事件消息时,我无法访问,但我可以访问用户主题消息。前任:@Configuration@EnableKafkapublic class KafkaConsumerConfiguration {      @Autowiredprivate Environment environment;@Beanpublic ConsumerFactory<String,User> consumerFactory() {    Map<String, Object> config = new HashMap<>();    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));    config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("user.consumer.group"));    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),            new JsonDeserializer<>(User.class));}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {    ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();    factory.setConsumerFactory(consumerFactory());    return factory;}@Beanpublic ConsumerFactory<String , Event> consumerFactoryEvent(){    Map<String, Object> config = new HashMap<>();    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));    config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("event.consumer.group"));    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),            new JsonDeserializer<>(Event.class));}我的需要是首先侦听事件主题并对消息进行一些按摩,然后将其发送到用户主题,我有另一种方法可以侦听用户主题并对该消息执行某些操作。我尝试将不同的选项传递给@KafkaListener 如@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent")但它不起作用..我不确定出了什么问题..任何建议都有帮助!
查看完整描述

3 回答

?
料青山看我应如是

TA贡献1772条经验 获得超8个赞

如果你没有在 bean 中指定名称,那么方法名称将是 bean 名称,添加带有 groupid 的 bean 名称 @KafkaListener


@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent", groupId="")


@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactory", groupId="")

或者


指定名称@Bean并将该名称添加到@kafkaListener


@Bean(name="kafkaListenerContainerFactoryEvent")

public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactoryEvent() {

ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactoryEvent());

return factory;

}


查看完整回答
反对 回复 2021-07-14
  • 3 回答
  • 0 关注
  • 380 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信