为了账号安全,请及时绑定邮箱和手机立即绑定

如何在 KafkaBootstrapConfiguration 中覆盖 KafkaListener

如何在 KafkaBootstrapConfiguration 中覆盖 KafkaListener

偶然的你 2023-06-21 16:20:08
我需要向 KafkaListenerEndpointRegistry 添加一些逻辑 - 我想为每个主题注册额外的侦听器(我想创建具有不同轮询时间的重试主题消费者链),我使用 @Listener 注释创建。为此,我想尝试覆盖 registerListenerContainer 方法并在那里实现逻辑。我做的第一步是添加与 KafkaBootstrapConfiguration 相同的默认配置。但在那之后,我所有的测试都失败了,出于某种原因,我的听众没有消费任何东西。如果我不添加豆,一切都会正常。@Configuration@EnableKafkapublic class CustomKafkaBootstrapConfiguration {  @Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)  public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {    return new KafkaListenerEndpointRegistry(){        @Override        public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {            //i need to add logic here            super.registerListenerContainer(endpoint, factory);        }    };  }}
查看完整描述

1 回答

?
慕田峪9158850

TA贡献1794条经验 获得超7个赞

我刚刚复制了您的覆盖,一切都按预期进行。


@SpringBootApplication

public class So57674940Application {


    public static void main(String[] args) {

        SpringApplication.run(new Class<?>[] { So57674940Application.class, So57674940ApplicationConfig.class }, args);

    }


    @KafkaListener(id = "so57674940", topics = "so57674940")

    public void listen(String in) {

        System.out.println(in);

    }


}


@Configuration

@EnableKafka

class So57674940ApplicationConfig {


    @Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)

    public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {

        return new KafkaListenerEndpointRegistry() {

            @Override

            public void registerListenerContainer(KafkaListenerEndpoint endpoint,

                    KafkaListenerContainerFactory<?> factory) {

                // i need to add logic here

                System.out.println("in custom registry");

                super.registerListenerContainer(endpoint, factory);

            }

        };

    }


}


in custom registry

2019-08-27 11:20:36.251  INFO 33460 --- [o57674940-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so57674940-0]



查看完整回答
反对 回复 2023-06-21
  • 1 回答
  • 0 关注
  • 146 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信