为了账号安全,请及时绑定邮箱和手机立即绑定

如何将 RecordInterceptor 设置为 ConcurrentKafkaListener

如何将 RecordInterceptor 设置为 ConcurrentKafkaListener

慕码人8056858 2024-01-05 15:18:59
我正在使用 Spring Kafka 2.2.7,我已经配置@EnableKafka并kafkaListenerContainerFactory使用它@KafkaListener来消费消息,一切都按预期工作。我想添加一个RecordInterceptor来记录所有消耗的消息,但发现很难配置它。文档指出 RecordInterceptor 可以在容器上设置,但是我不确定如何获取容器的实例。从2.2.7版本开始,可以向监听器容器添加RecordInterceptor;它将在调用侦听器之前调用,以允许检查或修改记录。    @Bean    public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(createConsumerFactory());        factory.setConcurrency(consumerCount);        return factory;    }我浏览了 Spring 文档,但没有找到解决方案,这似乎是一件简单的事情,但也许我错过了一些东西。在这方面的任何帮助将不胜感激。提前致谢。
查看完整描述

1 回答

?
莫回无

TA贡献1865条经验 获得超7个赞

有一个方法setRecordInterceptor因为2.2.7

factory.setRecordInterceptor(new RecordInterceptor);

另一条信息RecordInterceptor不适用于批处理侦听器

从2.2.7版本开始,可以向监听器容器添加RecordInterceptor;它将在调用侦听器之前调用,以允许检查或修改记录。如果拦截器返回 null,则不会调用侦听器。当侦听器是批处理侦听器时,不会调用拦截器。


查看完整回答
反对 回复 2024-01-05
  • 1 回答
  • 0 关注
  • 87 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信