为了账号安全,请及时绑定邮箱和手机立即绑定

将 KafkaAvroDeserializer 与 Alpakka 结合使用

将 KafkaAvroDeserializer 与 Alpakka 结合使用

杨魅力 2022-12-28 15:39:31
我有一个 SchemaRegistry 和一个 KafkaBroker,我使用 Avro v1.8.1 从中提取数据。对于反序列化,我一直在使用 Confluent 的KafkaAvroDeserializer。现在我打算重构我的代码以使用 Alpakka 提供的Elasticsearch API,但不幸的是这会破坏反序列化,因为它会导致 NullPointerExceptions:线程“main”中的异常 org.apache.kafka.common.errors.SerializationException:在偏移量 0 处反序列化分区 topic-0 的键/值时出错。如果需要,请寻找过去的记录以继续消费。原因:org.apache.kafka.common.errors.SerializationException:反序列化 id 2 的 Avro 消息时出错 原因:io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)处的 java.lang.NullPointerException io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) 在 io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) 在 org.apache.kafka.Deserialization.Deserializer.common.serializer在 org.apache.kafka.clients.consumer 中反序列化(Deserializer.java:58)。我一直在使用 Alpakka 的 ConsumerSettings API,如本例中所述:val system = ActorSystem.create();// necessary to convert timestamps correctly in Avro Version 1.8.1 to avoid ClassCastExceptionsSpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());val consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new KafkaAvroDeserializer())    .withBootstrapServers(kafkaBootstrapServerUrl)    .withClientId(InetAddress.getLocalHost().getHostName())    .withGroupId("" + new Random().nextInt())    .withProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)    .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")    .withStopTimeout(Duration.ofSeconds(5));
查看完整描述

1 回答

?
倚天杖

TA贡献1828条经验 获得超3个赞

我认为您需要拉到new KafkaAvroDeserializer()它自己的变量,然后调用该.configure()实例上的方法以传入非空注册表 URL。

然后将配置的实例传入ConsumerSettings.create

FWIW,根据您的需要,Kafka Connect 可以很好地加载 Elasticsearch


查看完整回答
反对 回复 2022-12-28
  • 1 回答
  • 0 关注
  • 71 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信