为了账号安全,请及时绑定邮箱和手机立即绑定

如何为 Kafka KeyValueStore 设置值 de/serializer?

如何为 Kafka KeyValueStore 设置值 de/serializer?

慕村9548890 2023-03-02 15:32:04
我将来自 Kafka 主题的消息存储在 KeyValueStore 中,以便稍后查询它们。我创建一个 KTable 如下:@StreamListener    public void process(@Input("input") KTable<String,MyMessage> myMessages) {我在 application.yml 中配置了消费者,如下所示:更新的反/序列化程序包spring.cloud.stream.kafka.streams.bindings.input:  consumer:    materializedAs: all-messages    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer    value-deserializer: com.me.MyMessageDeserializer    key-serializer: org.apache.kafka.common.serialization.StringSerializer    value-serializer: com.me.MyMessageSerializer但是,当我从 KeyValueStore 中读取时,键作为字符串正确返回,但返回的值是字节数组而不是 MyMessage。出于某种原因,我的自定义解串器没有被使用。我试图自己反序列化消息,但我的反序列化器因异常而崩溃。我在我的序列化器上放置了一个断点,但它从未被调用过。我很清楚我的序列化器或反序列化器都没有被使用。我缺少什么配置以便使用我的自定义值反序列化器?反序列化器是否需要在特定的包中才能找到?
查看完整描述

1 回答

?
慕码人2483693

TA贡献1860条经验 获得超9个赞

application.yml 中使用了错误的配置键。而不是 key-deserializer: 它应该是 keySerde: 而不是 value-deserializer: 它应该是 valueSerde。下面是正确的配置:

spring.cloud.stream.kafka.streams.bindings.input:  consumer:    materializedAs: all-messages    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde    valueSerde: com.me.MyMessageSerde  producer:    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde    valueSerde: com.me.MyMessageSerde


查看完整回答
反对 回复 2023-03-02
  • 1 回答
  • 0 关注
  • 124 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信