我开发了几个 C++ 应用程序,它们生成和使用嵌入 Protobuf3 消息的 Kafka 消息(使用 cppkafka)。两者都工作正常。生产者的相关代码是:std::string kafkaString;cppkafka::MessageBuilder *builder;...solidList->SerializeToString(&kafkaString);builder->payload(kafkaString);Protobuf 对象被序列化为字符串并作为 Kafka 负载插入。到目前为止一切正常。现在,我正在尝试用 Java 开发一个消费者。相关代码应该是:KafkaConsumer<Long, String> consumer=new KafkaConsumer<Long, String>(properties);....ConsumerRecords<Long, String> records = consumer.poll(100); for (ConsumerRecord<Long, String> record : records) { SolidList solidList = SolidList.parseFrom(record.value()); ...但在编译时失败: parseFrom 抱怨:类型 Solidlist.SolidList 中的方法 parseFrom(ByteBuffer) 不适用于参数 (String)。所以,我尝试使用 ByteBuffer:KafkaConsumer<Long, ByteBuffer> consumer=new KafkaConsumer<Long, ByteBuffer>(properties);....ConsumerRecords<Long, ByteBuffer> records = consumer.poll(100); for (ConsumerRecord<Long, ByteBuffer> record : records) { SolidList solidList = SolidList.parseFrom(record.value()); ...现在,错误在执行时间,仍然在 parseFrom(): Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot be cast to java.nio.ByteBuffer。我知道它是一个 java.lang.String !!!所以,我回到原来的状态,并尝试将其用作字节数组: SolidList solidList = SolidList.parseFrom(record.value().getBytes());现在,错误出现在执行时间:线程“main” com.google.protobuf.InvalidProtocolBufferException$InvalidWireTypeException 中的异常:协议消息标记的线类型无效。.C++ 序列化的 protobuf 文档说明:bool SerializeToString(string output) const;:序列化消息并将字节存储在给定的字符串中。请注意,字节是二进制的,而不是文本;我们只使用字符串类作为方便的容器。*TL;DR:因此,我应该如何解释 Java 中的 protobuf C++“二进制字节”?
2 回答
蓝山帝景
TA贡献1843条经验 获得超7个赞
尝试实现一个反序列化器并将其作为值反序列化器传递给KafkaConsumer构造函数。它可能看起来像这样:
class SolidListDeserializer implements Deserializer<SolidList> {
public SolidList deserialize(final String topic, byte[] data) {
return SolidList.parseFrom(data);
}
...
}
...
KafkaConsumer<Long, SolidList> consumer = new KafkaConsumer<>(props, new LongDeserializer(), new SolidListDeserializer())
DIEA
TA贡献1820条经验 获得超2个赞
您可以将 kafka 读作ConsumerRecords<Long, String>
. 接着SolidList.parseFrom(ByteBuffer.wrap(record.value().getBytes("UTF-8")));
添加回答
举报
0/150
提交
取消