我无法升级到 Spring 5,所以我坚持使用 spring-kafka 1.3 及其有限的错误处理。所以我无法访问 spring-kafka 2 中的 ConsumerAwareErrorHandler 或 SeekToCurrent 错误处理程序。我正在使用@KafkaListener-annotated 方法来收听一个主题,我已将其配置io.confluent.kafka.serializers.KafkaAvroDeserializer为我的值反序列化器。问题是,如果我在主题中得到一条不是 Avro 格式的消息,那么 KafkaMessageListenerContainer 轮询循环就会卡住。反序列化程序会在消息上引发异常,并且轮询循环永远不会越过它,因此下次通过循环时,它会尝试反序列化相同的消息并继续循环,每秒将相同的错误转储到我的日志中数千次。似乎没有办法获得 NeverRetryPolicy 或边缘方面的任何东西,但我可以factory.getContainerProperties().setErrorHandler()。不幸的是,我不确定我能从那里做什么。有什么东西可以自动连接到我的错误处理程序中,我可以用它来寻找错误时的前向 1 个偏移量吗?不确定那是什么,文档并没有过多地谈论您可以使用 ErrorHandler 实际做什么,我能找到的大多数示例都是针对 spring-kafka 2.X 的。就像,它不会反序列化,我对消息无能为力,它永远不会起作用,我想避免再次重试它,而且似乎大多数 Stackoverflow 问题都是关于做相反的事情。我还看到有些人只是将 Avro 的 Deserializer 包装在他们自己的类中,该类会吃掉异常并返回 null。这是一个更好的计划吗?
1 回答
幕布斯7119047
TA贡献1794条经验 获得超8个赞
问题是反序列化在 Spring 获取数据之前就失败了——问题出在 Kafka 本身。
在 2.2 中,我们添加了ErrorHandlingDeserializer2
包装真正的反序列化器并向侦听器容器发送信号,以便可以将错误发送到错误处理程序。
在旧版本中,您需要编写自己的反序列化器包装器 - 但是,容器中没有代码来处理这种情况,因此您的 catch 块需要返回一个真实的对象,该对象是向您的侦听器发出反序列化失败的信号.
假设您的听众收到Invoice
. 您的 catch 块可以创建一个子类,BadInvoice
然后说您可以在侦听器中检测并丢弃它。
我还看到有些人只是将 Avro 的 Deserializer 包装在他们自己的类中,该类会吃掉异常并返回 null。这是一个更好的计划吗?
如果您从未获得真正的空记录,您可以返回null
,但您必须添加@Payload(required = false)
到方法参数。
添加回答
举报
0/150
提交
取消