为了账号安全,请及时绑定邮箱和手机立即绑定

使用 Apache Beam 反序列化 Kafka AVRO 消息

使用 Apache Beam 反序列化 Kafka AVRO 消息

慕村225694 2022-06-23 20:01:30
主要目标是聚合两个 Kafka 主题,一个是压缩的慢速移动数据,另一个是每秒接收的快速移动数据。我已经能够在简单的场景中使用消息,例如使用以下内容的 KV (Long,String):PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long, String>read().withKeyDeserializer(LongDeserializer.class).withValueDeserializer(StringDeserializer.class)  PCollection<String> output = input.apply(Values.<String>create());但这似乎不是当您需要从 AVRO 反序列化时的方法。我有一个需要消耗的 KV(STRING, AVRO)。我尝试从 AVRO 模式生成 Java 类,然后将它们包含在“应用”中,例如:PCollection<MyClass> output = input.apply(Values.<MyClass>create());但这似乎不是正确的方法。是否有任何人可以指出我的文档/示例,以便我了解您将如何使用 Kafka AVRO 和 Beam?我已经更新了我的代码:import io.confluent.kafka.serializers.KafkaAvroDeserializer;import org.apache.beam.sdk.Pipeline;import org.apache.beam.sdk.coders.AvroCoder;import org.apache.beam.sdk.io.kafka.KafkaIO;import org.apache.beam.sdk.options.PipelineOptions;import org.apache.beam.sdk.options.PipelineOptionsFactory;import org.apache.beam.sdk.values.KV;import org.apache.beam.sdk.values.PCollection;import org.apache.kafka.common.serialization.LongDeserializer;public class Main {public static void main(String[] args) {    PipelineOptions options = PipelineOptionsFactory.create();    Pipeline p = Pipeline.create(options);    PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()            .withKeyDeserializer(LongDeserializer.class)            .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))    );    p.run();}}import org.apache.beam.sdk.coders.AvroCoder;import org.apache.beam.sdk.coders.DefaultCoder;@DefaultCoder(AvroCoder.class)public class Myclass{String name;String age;Myclass(){}Myclass(String n, String a) {    this.name= n;    this.age= a;}}但我现在收到以下错误incompatible types: java.lang.Class < io.confluent.kafka.serializers.KafkaAvroDeserializer > cannot be converted to java.lang.Class < ? extends org.apache.kafka.common.serialization.Deserializer < java.lang.String > >我必须导入不正确的序列化程序?
查看完整描述

4 回答

?
SMILET

TA贡献1796条经验 获得超4个赞

我遇到了同样的问题。在此邮件存档中找到了解决方案。 http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E


在您的情况下,您需要定义自己的,它可以从如下Deserializer<MyClass>扩展。AbstractKafkaAvroDeserializer


public class MyClassKafkaAvroDeserializer extends

  AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {

  

  @Override

  public void configure(Map<String, ?> configs, boolean isKey) {

      configure(new KafkaAvroDeserializerConfig(configs));

  }


  @Override

  public MyClass deserialize(String s, byte[] bytes) {

      return (MyClass) this.deserialize(bytes);

  }


  @Override

  public void close() {} }

然后将您的KafkaAvroDeserializer指定为 ValueDeserializer。


p.apply(KafkaIO.<Long, MyClass>read()

 .withKeyDeserializer(LongDeserializer.class)

 .withValueDeserializer(MyClassKafkaAvroDeserializer.class) );


查看完整回答
反对 回复 2022-06-23
?
小唯快跑啊

TA贡献1863条经验 获得超2个赞

您可以使用 KafkaAvroDeserializer,如下所示:


PCollection<KV<Long,MyClass>> input = p.apply(KafkaIO.<Long, String>read()

.withKeyDeserializer(LongDeserializer.class)

  .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))

其中MyClass是 POJO 类生成的 Avro Schema。


确保您的 POJO 类具有注释 AvroCoder,如下例所示:


@DefaultCoder(AvroCoder.class)

   public class MyClass{

      String name;

      String age;


      MyClass(){}

      MyClass(String n, String a) {

         this.name= n;

         this.age= a;

      }

  }


查看完整回答
反对 回复 2022-06-23
?
Cats萌萌

TA贡献1805条经验 获得超9个赞

我今天遇到了类似的问题,并遇到了以下示例,它为我解决了这个问题。

https://github.com/andrewrjones/debezium-kafka-beam-example/blob/master/src/main/java/com/andrewjones/KafkaAvroConsumerExample.java

对我来说缺少的部分是(类)KafkaAvroDeserializer

KafkaIO.<String, MyClass>read()
        .withBootstrapServers("kafka:9092")
        .withTopic("dbserver1.inventory.customers")
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))



查看完整回答
反对 回复 2022-06-23
?
慕斯709654

TA贡献1840条经验 获得超5个赞

我也发现这行得通


import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;


...


public static class CustomKafkaAvroDeserializer extends SpecificAvroDeserializer<MyCustomClass> {}


...

.withValueDeserializerAndCoder(CustomKafkaAvroDeserializer.class, AvroCoder.of(MyCustomClass.class))

...

MyCustomClass使用 Avro 工具生成的代码在哪里。


查看完整回答
反对 回复 2022-06-23
  • 4 回答
  • 0 关注
  • 97 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信