为了账号安全,请及时绑定邮箱和手机立即绑定

如何添加额外的自定义标头,同时抛出异常以将消息存储在 kafka 上的 dlq 中?

如何添加额外的自定义标头,同时抛出异常以将消息存储在 kafka 上的 dlq 中?

幕布斯7119047 2023-11-10 15:51:19
我有一个监听 kafka 主题的消费者应用程序,在异常情况下,它会按预期将记录发送到 dlq(死信队列)。在抛出异常之前,我在消费者应用程序中设置标头,并且它正在设置,但没有发送到死信主题,死信主题只有内置标头,例如“x-exception-message”、“x-”原始分区'等...而不是我设置的那个。以下是我的消费者应用程序中的一段代码:modifiedMessage = MessageBuilder.fromMessage(consumedMessage).setHeader("x-ecode", new Integer(100)).setHeader(BinderHeaders.PARTITION_OVERRIDE,consumedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID)).build();System.out.println("error header:"+modifiedMessage.getHeaders().get("x-ecode",Integer.class)); //100throw new RuntimeException(modifiedMessage.toString());注意:我在 spring.cloud.stream.kafka.binder.header=x-ecode 下的 application.yml 中设置 x-code在上面的代码中,我能够设置标头,并实际验证它已设置但未发送到死信主题。有效负载已正确发送,我如何将该标头发送到死信主题?我是否需要在 application.yml 中添加任何属性才能使其发送?
查看完整描述

1 回答

?
米琪卡哇伊

TA贡献1998条经验 获得超6个赞

您所做的就是创建一条新消息并丢弃它(除非您将异常消息设置为其字符串实现)。

这不会修改原始入站消息。

您只需添加一个新的输出绑定并自行将修改后的消息发送给它即可。

或者,如果您只添加一个常量,则可以将 a 添加ChannelInterceptor到绑定的错误通道并修改那里的消息。如果您需要将状态传递给拦截器,您可以使用自定义异常。

然而,最简单的解决方案是自己发布消息,而不是使用绑定器的DLQ机制。您已经有了创建消息的逻辑,因此它只是myCustomDlqBinding.send(modifiedMessage)(还添加了标准 DLQ 标头)。


查看完整回答
反对 回复 2023-11-10
  • 1 回答
  • 0 关注
  • 106 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信