4 回答
TA贡献1818条经验 获得超3个赞
我稍微调查了一下,发现问题出在 KafkaAvroSerializer/Deserializer 使用的 CashedSchemaRegistryClient 中。它用于从 Confluent Schema Registry 中获取模式定义。
您已经在本地拥有架构定义,因此您无需为它们转到架构注册表。(至少在你的测试中)
我有一个类似的问题,我通过创建自定义 KafkaAvroSerializer/KafkaAvroDeserializer 解决了它。
这是 KafkaAvroSerializer 的示例。这很简单。您只需要扩展提供的 KafkaAvroSerializer 并告诉他使用 MockSchemaRegistryClient。
public class CustomKafkaAvroSerializer extends KafkaAvroSerializer {
public CustomKafkaAvroSerializer() {
super();
super.schemaRegistry = new MockSchemaRegistryClient();
}
public CustomKafkaAvroSerializer(SchemaRegistryClient client) {
super(new MockSchemaRegistryClient());
}
public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {
super(new MockSchemaRegistryClient(), props);
}
}
这是 KafkaAvroDeserializer 的示例。当调用反序列化方法时,您需要告诉他要使用哪个模式。
public class CustomKafkaAvroDeserializer extends KafkaAvroDeserializer {
@Override
public Object deserialize(String topic, byte[] bytes) {
this.schemaRegistry = getMockClient(KafkaEvent.SCHEMA$);
return super.deserialize(topic, bytes);
}
private static SchemaRegistryClient getMockClient(final Schema schema$) {
return new MockSchemaRegistryClient() {
@Override
public synchronized Schema getById(int id) {
return schema$;
}
};
}
}
最后一步是告诉 spring 使用创建的序列化器/反序列化器
spring.kafka.producer.properties.schema.registry.url= not-used
spring.kafka.producer.value-serializer = CustomKafkaAvroSerializer
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.group-id = showcase-producer-id
spring.kafka.consumer.properties.schema.registry.url= not-used
spring.kafka.consumer.value-deserializer = CustomKafkaAvroDeserializer
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id = showcase-consumer-id
spring.kafka.auto.offset.reset = earliest
spring.kafka.producer.auto.register.schemas= true
spring.kafka.properties.specific.avro.reader= true
TA贡献1780条经验 获得超1个赞
如果你在 3 年后看这个例子,你可能想要对 CustomKafkaAvroDeserializer 做一些小的修改
private static SchemaRegistryClient getMockClient(final Schema schema) {
return new MockSchemaRegistryClient() {
@Override
public ParsedSchema getSchemaBySubjectAndId(String subject, int id)
throws IOException, RestClientException {
return new AvroSchema(schema);
}
};
}
TA贡献1829条经验 获得超4个赞
如果您的 @KafkaListener 在测试类中,那么您可以在 StringDeserializer 中读取它,然后手动将其转换为所需的类
@Autowired
private MyKafkaAvroDeserializer myKafkaAvroDeserializer;
@KafkaListener( topics = "test")
public void inputData(ConsumerRecord<?, ?> consumerRecord) {
log.info("received payload='{}'", consumerRecord.toString(),consumerRecord.value());
GenericRecord genericRecord = (GenericRecord)myKafkaAvroDeserializer.deserialize("test",consumerRecord.value().toString().getBytes(StandardCharsets.UTF_8));
Myclass myclass = (Myclass) SpecificData.get().deepCopy(Myclass.SCHEMA$, genericRecord);
}
@Component
public class MyKafkaAvroDeserializer extends KafkaAvroDeserializer {
@Override
public Object deserialize(String topic, byte[] bytes) {
this.schemaRegistry = getMockClient(Myclass.SCHEMA$);
return super.deserialize(topic, bytes);
}
private static SchemaRegistryClient getMockClient(final Schema schema$) {
return new MockSchemaRegistryClient() {
@Override
public synchronized org.apache.avro.Schema getById(int id) {
return schema$;
}
};
}
}
记得在 application.yml 中添加 schema registry 和 key/value serializer 虽然不会用到
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
schema.registry.url :http://localhost:8080
TA贡献1875条经验 获得超5个赞
如错误所述,您需要在生产者配置中向注册表提供一个字符串,而不是一个对象。
由于您使用的是 Mock 类,因此该字符串可以是任何东西......
但是,您需要在给定注册表实例的情况下构造序列化程序
Serializer serializer = new KafkaAvroSerializer(mockSchemaRegistry);
// make config map with ("schema.registry.url", "unused")
serializer.configure(config, false);
否则,它将尝试创建一个非模拟客户端
并将其放入属性中
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);
添加回答
举报