为了账号安全,请及时绑定邮箱和手机立即绑定

使用 Spring Boot 从 Kafka 队列消费时获取序列化异常

使用 Spring Boot 从 Kafka 队列消费时获取序列化异常

米琪卡哇伊 2022-12-15 15:14:56
我正在使用带有 Kafka 的 Spring Boot 来使用来自 Kafka 队列的消息。但是,如果 Jackson 解析 payload 有任何错误。该消息仍然卡住,并不断重新尝试消费,始终给出解析异常。我尝试在 Kafka 配置中使用 ErrorHandlingDeserializer2 并映射错误处理程序,但问题仍然存在。@Configuration@EnableKafkapublic class KafkaConfig {    @Value("${spring.kafka.bootstrap-servers}")    private String bootstrapServers;    @Bean    public Map<String, Object> consumerConfigs() {        Map<String, Object> props = new HashMap<>();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);        props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, StringDeserializer.class);        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);        props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);        props.put(ConsumerConfig.GROUP_ID_CONFIG, "${connecto.group-id}");        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);        return props;    }    @Bean    public ConsumerFactory<String, UserDto> consumerFactory() {        return new DefaultKafkaConsumerFactory<>(                consumerConfigs(),                new StringDeserializer(),                new JsonDeserializer<>(UserDto.class));    }    @Bean    public ConcurrentKafkaListenerContainerFactory<String, UserDto> kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory<String, UserDto> factory =                new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        factory.setErrorHandler(new MosaicKafkaErrorHandler());        return factory;    }    @Bean    public KafkaTemplate kafkaTemplate()     {        return new KafkaTemplate<>(producerFactory());    }}
查看完整描述

1 回答

?
开满天机

TA贡献1786条经验 获得超13个赞

为了解决这个问题,2.2 版引入了ErrorHandlingDeserializer2. 此反序列化器委托给真正的反序列化器(键或值)。如果委托未能反序列化记录内容,则ErrorHandlingDeserializer2返回一个空值和一个DeserializationException包含原因和原始字节的标头。当您使用记录级别MessageListener时,如果 ConsumerRecord 包含DeserializationException键或值的标头,则会使用失败的 ConsumerRecord 调用容器的 ErrorHandler。记录不会传递给侦听器。


但是,由于您使用的是早期版本,因此可以使用以下 hack。


     @Override

        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,

                MessageListenerContainer container) {

            thrownException.printStackTrace();

            if (thrownException instanceOf SerializationException){

                String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];

                String topics = s.split("-")[0];

                int offset = Integer.valueOf(s.split("offset ")[1]);

                int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);


                TopicPartition topicPartition = new TopicPartition(topics, partition);

                consumer.seek(topicPartition, offset + 1);  

               }

            }


查看完整回答
反对 回复 2022-12-15
  • 1 回答
  • 0 关注
  • 244 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信