为了账号安全,请及时绑定邮箱和手机立即绑定

AVRO 原始类型的 Serde 类

AVRO 原始类型的 Serde 类

芜湖不芜 2021-08-04 17:04:21
我正在用 Java 编写一个 Kafka 流应用程序,它接受由连接器创建的输入主题,该连接器使用模式注册表和 avro 作为键和值转换器。连接器产生以下模式:key-schema: "int"value-schema:{"type": "record","name": "User","fields": [    {"name": "firstname", "type": "string"},    {"name": "lastname",  "type": "string"}]}实际上,有几个主题,key-schema 总是“int”,value-schema 总是某种记录(用户、产品等)。我的代码包含以下定义Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);Serde<User> userSerde = new SpecificAvroSerde<>();userSerde.configure(serdeConfig, false);起初我尝试使用类似的东西来消费这个主题, Consumed.with(Serdes.Integer(), userSerde);但这不起作用,因为 Serdes.Integer() 期望使用 4 个字节对整数进行编码,但 avro 使用可变长度编码。使用Consumed.with(Serdes.Bytes(), userSerde);有效,但我真的想要 int 而不是字节,所以我将代码更改为此KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer()KafkaAvroSerializer keySerializer = new KafkaAvroSerializer();keyDeserializer.configure(serdeConfig, true); keySerializer.configure(serdeConfig, true);Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);这使编译器产生警告(它不喜欢(Serde<Integer>)(Serde)强制转换)但它允许我使用Consumed.with(keySerde, userSerde);并获取一个整数作为键。这工作得很好,我的应用程序按预期运行(很棒!!!)。但是现在我想为键/值定义默认的 serde 并且我无法让它工作。设置默认值 serde 很简单:streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);但是我无法弄清楚如何定义默认键 serde。我试过了streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName()); 产生运行时错误:找不到 org.apache.kafka.common.serialization.Serdes$WrapperSerde 的公共无参数构造函数streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); 产生运行时错误:java.lang.Integer 不能转换为 org.apache.avro.specific.SpecificRecord我错过了什么?谢谢。
查看完整描述

2 回答

?
翻翻过去那场雪

TA贡献2065条经验 获得超14个赞

我想发布解决方案的工作。请随意增强它。


import java.util.Collections;

import java.util.Map;


import org.apache.kafka.common.serialization.Deserializer;

import org.apache.kafka.common.serialization.Serde;

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.common.serialization.Serializer;


import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;

import io.confluent.kafka.serializers.KafkaAvroSerializer;


public class GenericPrimitiveAvroSerDe<T> implements Serde<T> {


    private final Serde<Object> inner;


    /**

     * Constructor used by Kafka Streams.

     */

    public GenericPrimitiveAvroSerDe() {

        inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());

    }


    public GenericPrimitiveAvroSerDe(SchemaRegistryClient client) {

        this(client, Collections.emptyMap());

    }


    public GenericPrimitiveAvroSerDe(SchemaRegistryClient client, Map<String, ?> props) {

        inner = Serdes.serdeFrom(new KafkaAvroSerializer(client), new KafkaAvroDeserializer(client, props));

    }


    @Override

    public void configure(final Map<String, ?> serdeConfig, final boolean isSerdeForRecordKeys) {

        inner.serializer().configure(serdeConfig, isSerdeForRecordKeys);

        inner.deserializer().configure(serdeConfig, isSerdeForRecordKeys);

    }


    @Override

    public void close() {

        // TODO Auto-generated method stub

        inner.serializer().close();

        inner.deserializer().close();


    }


    @SuppressWarnings("unchecked")

    @Override

    public Serializer<T> serializer() {

        // TODO Auto-generated method stub

        Object obj = inner.serializer();

        return (Serializer<T>) obj;


    }


    @SuppressWarnings("unchecked")

    @Override

    public Deserializer<T> deserializer() {

        // TODO Auto-generated method stub

        Object obj = inner.deserializer();

        return (Deserializer<T>) obj;


    }


}

用作默认流配置:


props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);

            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);

覆盖默认值:


final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",

                                                                        "http://localhost:8081");

       final GenericPrimitiveAvroSerDe<String> keyGenericAvroSerde = new GenericPrimitiveAvroSerDe<String>();

       keyGenericAvroSerde.configure(serdeConfig, true); // `true` for record keys

       final GenericPrimitiveAvroSerDe<Long> valueGenericAvroSerde = new GenericPrimitiveAvroSerDe<Long>();

       valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values



查看完整回答
反对 回复 2021-08-04
  • 2 回答
  • 0 关注
  • 130 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信