4 回答
TA贡献1796条经验 获得超4个赞
我遇到了同样的问题。在此邮件存档中找到了解决方案。 http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E
在您的情况下,您需要定义自己的,它可以从如下Deserializer<MyClass>扩展。AbstractKafkaAvroDeserializer
public class MyClassKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(new KafkaAvroDeserializerConfig(configs));
}
@Override
public MyClass deserialize(String s, byte[] bytes) {
return (MyClass) this.deserialize(bytes);
}
@Override
public void close() {} }
然后将您的KafkaAvroDeserializer指定为 ValueDeserializer。
p.apply(KafkaIO.<Long, MyClass>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(MyClassKafkaAvroDeserializer.class) );
TA贡献1863条经验 获得超2个赞
您可以使用 KafkaAvroDeserializer,如下所示:
PCollection<KV<Long,MyClass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))
其中MyClass是 POJO 类生成的 Avro Schema。
确保您的 POJO 类具有注释 AvroCoder,如下例所示:
@DefaultCoder(AvroCoder.class)
public class MyClass{
String name;
String age;
MyClass(){}
MyClass(String n, String a) {
this.name= n;
this.age= a;
}
}
TA贡献1805条经验 获得超9个赞
我今天遇到了类似的问题,并遇到了以下示例,它为我解决了这个问题。
对我来说缺少的部分是(类)KafkaAvroDeserializer
KafkaIO.<String, MyClass>read() .withBootstrapServers("kafka:9092") .withTopic("dbserver1.inventory.customers") .withKeyDeserializer(StringDeserializer.class) .withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))
TA贡献1840条经验 获得超5个赞
我也发现这行得通
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
...
public static class CustomKafkaAvroDeserializer extends SpecificAvroDeserializer<MyCustomClass> {}
...
.withValueDeserializerAndCoder(CustomKafkaAvroDeserializer.class, AvroCoder.of(MyCustomClass.class))
...
MyCustomClass使用 Avro 工具生成的代码在哪里。
添加回答
举报