为了账号安全,请及时绑定邮箱和手机立即绑定

Spring 启动 kafka 消息传递。如何简化处理程序的 dto 映射?

Spring 启动 kafka 消息传递。如何简化处理程序的 dto 映射?

GCT1015 2021-12-30 20:24:50
我已经使用 Kafka 配置了我的 Spring Boot 项目。我可以接收和发布任何基于字符串的消息。字符串消息不是处理的最佳方式。具有将消息从字符串默认转换为对象的功能会更有用。实现这个功能我需要将几乎所有的 Kafka 配置从yml到java(使用属性)。...生产者示例@Beanpublic Map<String, Object> producerConfigs() {    Map<String, Object> props = new HashMap<>();    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AccountSerializer.class);    return props;}@Beanpublic ProducerFactory<String, Account> producerFactory() {    return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, Account> kafkaTemplate() {    return new KafkaTemplate<>(producerFactory());}该代码有效,但我接受了简化。在最好的情况下,我想优雅地配置yml,可能是一些 java 更改。但是以直接方式进行操作时,我将获得额外的每 3 个 bean 来配置每个kafkaTemplate和listenerFactory.它是否可能简化未来的配置(我需要更多额外的Serializer“解串器”)?如何?
查看完整描述

2 回答

?
临摹微笑

TA贡献1982条经验 获得超2个赞

似乎我没有任何机会为相同的侦听器配置不同SERIALIZER| DESERIALIZERs。


但是 id 并不意味着我的问题没有解决方案。


我对所有对象都使用了继承,并提供了一个抽象AbstractEvent。AbstractEvent一般没用,但它在我的解决方案中使用,如指定的输入点SERIALIZER| DESERIALIZER. 为了获取上下文中哪个对象的信息,我使用了自定义标题。org.apache.kafka.common.serialization.Deserializer没有标题参数,但我已经实现了我的DESERIALIZER基于ExtendedDeserializer. 这种方式让我可以访问标题


via public T deserialize(String topic, Headers headers, byte[] data)

我的解串器示例:


@Slf4j

public class AbstractEventDeserializer<T extends AbstractEvent> implements ExtendedDeserializer<T> {


    private Map<String, Class<T>> mappers = new HashMap<>();


    // default behavior

    @Override

    public T deserialize(String arg0, byte[] devBytes) {

        ObjectMapper mapper = new ObjectMapper();

        T bar = null;

        try {

            bar = (T) mapper.readValue(devBytes, Bar.class);

        } catch (Exception e) {

            e.printStackTrace();

        }

        return bar;

    }


    @Override

    public void close() {

        // TODO Auto-generated method stub

    }


    @Override

    public T deserialize(String topic, Headers headers, byte[] data) {

        log.info("handling...");

        headers.forEach(header -> log.info("   {}: {}", header.key(), getHeaderValueAsString(header)));

        Optional<String> classTypeFromHeader = getClassTypeFromHeader(headers);

        if (classTypeFromHeader.isPresent()) {

            return parseFromJson(data, mappers.get(classTypeFromHeader.get()));

        }

        return deserialize(topic, data);

    }


    private Optional<String> getClassTypeFromHeader(Headers headers) {

        return StreamSupport.stream(headers.headers("X-CLASS-TYPE").spliterator(), false)

                .map(Header::value)

                .map(String::new)

                .findFirst();

    }


    private String getHeaderValueAsString(Header header) {

        return Optional.ofNullable(header.value())

                .map(String::new)

                .orElse(null);

    }


    @Override

    public void configure(Map<String, ?> arg0, boolean arg1) {

        log.info("configuring deserialiser");

        if (arg0.containsKey("mappers")) {

            this.mappers = (Map<String, Class<T>>) arg0.get("mappers");

        }

        arg0.keySet().forEach(key -> log.info("   {}:{}", key, arg0.get(key)));

    }


}

如果您想尝试工作解决方案,请查看实验示例


查看完整回答
反对 回复 2021-12-30
?
倚天杖

TA贡献1828条经验 获得超3个赞

Spring 云服务可以为消费者提供更好的配置、并发、反序列化和更少的样板代码。


   <dependency>

        <groupId>org.springframework.cloud</groupId>

        <artifactId>spring-cloud-starter-stream-kafka</artifactId>

    </dependency>

水槽样品


@SpringBootApplication

@EnableBinding(Sink.class)

public class LoggingConsumerApplication {


public static void main(String[] args) {

    SpringApplication.run(LoggingConsumerApplication.class, args);

}


@StreamListener(Sink.INPUT)

public void handle(Person person) {

    System.out.println("Received: " + person);

}


public static class Person {

    private String name;

    public String getName() {

        return name;

    }

    public void setName(String name) {

        this.name = name;

    }

    public String toString() {

        return this.name;

    }

}

}

示例配置:


spring:

  cloud:

    stream:

      bindings:

        input:

          destination: <your topic>

          group: <your consumer group>

          consumer:

            headerMode: raw

            partitioned: true

            concurrency: 10

      kafka:

        binder:

          brokers: <Comma seperated list of kafka brokers>

此处提供更多信息https://cloud.spring.io/spring-cloud-stream/


查看完整回答
反对 回复 2021-12-30
  • 2 回答
  • 0 关注
  • 195 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信