我有一个监听 kafka 主题的消费者应用程序,在异常情况下,它会按预期将记录发送到 dlq(死信队列)。在抛出异常之前,我在消费者应用程序中设置标头,并且它正在设置,但没有发送到死信主题,死信主题只有内置标头,例如“x-exception-message”、“x-”原始分区'等...而不是我设置的那个。以下是我的消费者应用程序中的一段代码:modifiedMessage = MessageBuilder.fromMessage(consumedMessage).setHeader("x-ecode", new Integer(100)).setHeader(BinderHeaders.PARTITION_OVERRIDE,consumedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID)).build();System.out.println("error header:"+modifiedMessage.getHeaders().get("x-ecode",Integer.class)); //100throw new RuntimeException(modifiedMessage.toString());注意:我在 spring.cloud.stream.kafka.binder.header=x-ecode 下的 application.yml 中设置 x-code在上面的代码中,我能够设置标头,并实际验证它已设置但未发送到死信主题。有效负载已正确发送,我如何将该标头发送到死信主题?我是否需要在 application.yml 中添加任何属性才能使其发送?
1 回答
米琪卡哇伊
TA贡献1998条经验 获得超6个赞
您所做的就是创建一条新消息并丢弃它(除非您将异常消息设置为其字符串实现)。
这不会修改原始入站消息。
您只需添加一个新的输出绑定并自行将修改后的消息发送给它即可。
或者,如果您只添加一个常量,则可以将 a 添加ChannelInterceptor
到绑定的错误通道并修改那里的消息。如果您需要将状态传递给拦截器,您可以使用自定义异常。
然而,最简单的解决方案是自己发布消息,而不是使用绑定器的DLQ机制。您已经有了创建消息的逻辑,因此它只是myCustomDlqBinding.send(modifiedMessage)
(还添加了标准 DLQ 标头)。
添加回答
举报
0/150
提交
取消