我正在使用 Spring Kafka 2.2.7,我已经配置@EnableKafka并kafkaListenerContainerFactory使用它@KafkaListener来消费消息,一切都按预期工作。我想添加一个RecordInterceptor来记录所有消耗的消息,但发现很难配置它。文档指出 RecordInterceptor 可以在容器上设置,但是我不确定如何获取容器的实例。从2.2.7版本开始,可以向监听器容器添加RecordInterceptor;它将在调用侦听器之前调用,以允许检查或修改记录。 @Bean public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(createConsumerFactory()); factory.setConcurrency(consumerCount); return factory; }我浏览了 Spring 文档,但没有找到解决方案,这似乎是一件简单的事情,但也许我错过了一些东西。在这方面的任何帮助将不胜感激。提前致谢。
1 回答
莫回无
TA贡献1865条经验 获得超7个赞
有一个方法setRecordInterceptor因为2.2.7
factory.setRecordInterceptor(new RecordInterceptor);
另一条信息RecordInterceptor不适用于批处理侦听器
从2.2.7版本开始,可以向监听器容器添加RecordInterceptor;它将在调用侦听器之前调用,以允许检查或修改记录。如果拦截器返回 null,则不会调用侦听器。当侦听器是批处理侦听器时,不会调用拦截器。
添加回答
举报
0/150
提交
取消