为了账号安全,请及时绑定邮箱和手机立即绑定

Kafka Protobuf:C++ 序列化到 Java

Kafka Protobuf:C++ 序列化到 Java

元芳怎么了 2021-08-06 11:18:33
我开发了几个 C++ 应用程序,它们生成和使用嵌入 Protobuf3 消息的 Kafka 消息(使用 cppkafka)。两者都工作正常。生产者的相关代码是:std::string kafkaString;cppkafka::MessageBuilder *builder;...solidList->SerializeToString(&kafkaString);builder->payload(kafkaString);Protobuf 对象被序列化为字符串并作为 Kafka 负载插入。到目前为止一切正常。现在,我正在尝试用 Java 开发一个消费者。相关代码应该是:KafkaConsumer<Long, String> consumer=new KafkaConsumer<Long, String>(properties);....ConsumerRecords<Long, String> records = consumer.poll(100);  for (ConsumerRecord<Long, String> record : records) {    SolidList solidList = SolidList.parseFrom(record.value());    ...但在编译时失败: parseFrom 抱怨:类型 Solidlist.SolidList 中的方法 parseFrom(ByteBuffer) 不适用于参数 (String)。所以,我尝试使用 ByteBuffer:KafkaConsumer<Long, ByteBuffer> consumer=new KafkaConsumer<Long, ByteBuffer>(properties);....ConsumerRecords<Long, ByteBuffer> records = consumer.poll(100);  for (ConsumerRecord<Long, ByteBuffer> record : records) {    SolidList solidList = SolidList.parseFrom(record.value());    ...现在,错误在执行时间,仍然在 parseFrom(): Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot be cast to java.nio.ByteBuffer。我知道它是一个 java.lang.String !!!所以,我回到原来的状态,并尝试将其用作字节数组:    SolidList solidList = SolidList.parseFrom(record.value().getBytes());现在,错误出现在执行时间:线程“main” com.google.protobuf.InvalidProtocolBufferException$InvalidWireTypeException 中的异常:协议消息标记的线类型无效。.C++ 序列化的 protobuf 文档说明:bool SerializeToString(string output) const;:序列化消息并将字节存储在给定的字符串中。请注意,字节是二进制的,而不是文本;我们只使用字符串类作为方便的容器。*TL;DR:因此,我应该如何解释 Java 中的 protobuf C++“二进制字节”?
查看完整描述

2 回答

?
蓝山帝景

TA贡献1843条经验 获得超7个赞

尝试实现一个反序列化器并将其作为值反序列化器传递给KafkaConsumer构造函数。它可能看起来像这样:


class SolidListDeserializer implements Deserializer<SolidList> {

  public SolidList deserialize(final String topic, byte[] data) {

    return SolidList.parseFrom(data);

  }

  ...

}


...


KafkaConsumer<Long, SolidList> consumer = new KafkaConsumer<>(props, new LongDeserializer(), new SolidListDeserializer())



查看完整回答
反对 回复 2021-08-06
?
DIEA

TA贡献1820条经验 获得超2个赞

您可以将 kafka 读作ConsumerRecords<Long, String>. 接着SolidList.parseFrom(ByteBuffer.wrap(record.value().getBytes("UTF-8")));


查看完整回答
反对 回复 2021-08-06
  • 2 回答
  • 0 关注
  • 325 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信