为了账号安全,请及时绑定邮箱和手机立即绑定

如何在 Kafka-Spring 中捕获反序列化错误?

如何在 Kafka-Spring 中捕获反序列化错误?

白猪掌柜的 2022-11-02 10:35:54
我正在启动一个使用 kafka 消息的应用程序。为了捕捉反序列化异常,我遵循了关于反序列化错误处理的Spring 文档。我试过 failedDeserializationFunction 方法。这是我的消费者配置类@Bean    public Map<String, Object> consumerConfigs() {        Map<String, Object> consumerProps = new HashMap<>();        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);                /*  Error Handling */        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);        consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());        consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);        return consumerProps;    }    @Bean    public ConsumerFactory<String, NTCMessageBody> consumerFactory() {        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),                new JsonDeserializer<>(NTCMessageBody.class));    }        @Bean    public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        return factory;    }这是 BiFunction 提供者public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> {    @Override    public NTCMessageBody apply(byte[] t, Headers u) {        return new NTCBadMessageBody(t);    }}当我仅发送一条有关该主题的损坏消息时,我收到了此错误(循环中):org.apache.kafka.common.errors.SerializationException:反序列化键/值时出错我知道 ErrorHandlingDeserializer2 应该委托 NTCBadMessageBody 类型并继续消费。我还看到(在调试模式下)它从未进入 NTCBadMessageBody 类的构造函数中。
查看完整描述

4 回答

?
qq_遁去的一_1

TA贡献1725条经验 获得超7个赞

错误处理反序列化器

当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。为了解决这个问题,2.2 版本引入了 ErrorHandlingDeserializer。这个反序列化器委托给一个真正的反序列化器(键或值)。如果委托未能反序列化记录内容,则 ErrorHandlingDeserializer 将返回 DeserializationException,其中包含原因和原始字节。使用记录级 MessageListener 时,如果键或值包含 DeserializationException,则使用失败的 ConsumerRecord 调用容器的 ErrorHandler。使用 BatchMessageListener 时,失败的记录与批处理中的剩余记录一起传递给应用程序,因此应用程序侦听器有责任检查特定记录中的键或值是否是 DeserializationException。

因此,根据您使用的代码,record-level MessageListener只需添加ErrorHandlerContainer

处理异常

例如,如果您的错误处理程序实现了此接口,您可以相应地调整偏移量。例如,要重置偏移量以重播失败的消息,您可以执行以下操作;但是请注意,这些都是简单的实现,您可能需要更多地检查错误处理程序。

@Bean

public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {

return (m, e, c) -> {

    this.listen3Exception = e;

    MessageHeaders headers = m.getHeaders();

    c.seek(new org.apache.kafka.common.TopicPartition(

            headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),

            headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),

            headers.get(KafkaHeaders.OFFSET, Long.class));

    return null;

   };

}

或者您可以像本示例中那样进行自定义实现


@Bean

public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>

kafkaListenerContainerFactory()  {


    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory

            = new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory());

    factory.getContainerProperties().setErrorHandler(new ErrorHandler() {

        @Override

        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {

            String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];

            String topics = s.split("-")[0];

            int offset = Integer.valueOf(s.split("offset ")[1]);

            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);


            TopicPartition topicPartition = new TopicPartition(topics, partition);

            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);

            consumer.seek(topicPartition, offset + 1);

            System.out.println("OKKKKK");

        }


        @Override

        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {


        }


        @Override

        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {

            String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];

            String topics = s.split("-")[0];

            int offset = Integer.valueOf(s.split("offset ")[1]);

            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);


            TopicPartition topicPartition = new TopicPartition(topics, partition);

            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);

            consumer.seek(topicPartition, offset + 1);

            System.out.println("OKKKKK");



        }

    });



    return factory;

}


查看完整回答
反对 回复 2022-11-02
?
青春有我

TA贡献1784条经验 获得超8个赞

使用 ErrorHandlingDeserializer。

当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。为了解决这个问题,2.2 版本引入了 ErrorHandlingDeserializer。这个反序列化器委托给一个真正的反序列化器(键或值)。如果委托未能反序列化记录内容,则 ErrorHandlingDeserializer 将返回 DeserializationException,其中包含原因和原始字节。使用记录级 MessageListener 时,如果键或值包含 DeserializationException,则使用失败的 ConsumerRecord 调用容器的 ErrorHandler。使用 BatchMessageListener 时,失败的记录与批处理中的剩余记录一起传递给应用程序,

您可以使用 DefaultKafkaConsumerFactory 构造函数,该构造函数采用键和值 Deserializer 对象并连接到配置有适当委托的适当 ErrorHandlingDeserializer。或者,您可以使用 ErrorHandlingDeserializer 使用的使用者配置属性来实例化委托。属性名称为 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS 和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS;属性值可以是类或类名

package com.mypackage.app.config;


import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.TimeoutException;


import com.mypacakage.app.model.kafka.message.KafkaEvent;


import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import org.springframework.kafka.listener.ListenerExecutionFailedException;

import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;

import org.springframework.kafka.support.serializer.JsonDeserializer;

import org.springframework.retry.policy.SimpleRetryPolicy;

import org.springframework.retry.support.RetryTemplate;


import lombok.extern.slf4j.Slf4j;


@EnableKafka

@Configuration

@Slf4j

public class KafkaConsumerConfig {


    @Value("${kafka.bootstrap-servers}")

    private String servers;


    @Value("${listener.group-id}")

    private String groupId;


    @Bean

    public ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> ListenerFactory() {

    

        ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());


        factory.setRetryTemplate(retryTemplate());

        factory.setErrorHandler(((exception, data) -> {

            /*

             * here you can do you custom handling, I am just logging it same as default

             * Error handler does If you just want to log. you need not configure the error

             * handler here. The default handler does it for you. Generally, you will

             * persist the failed records to DB for tracking the failed records.

             */

            log.error("Error in process with Exception {} and the record is {}", exception, data);

        }));


        return factory;


    }


    @Bean

    public ConsumerFactory<String, KafkaEvent> consumerFactory() {

        Map<String, Object> config = new HashMap<>();


        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);

        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);


        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);

        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);

        config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);

        config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());

        config.put(JsonDeserializer.VALUE_DEFAULT_TYPE,

                "com.mypackage.app.model.kafka.message.KafkaEvent");

        config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.mypackage.app");


        return new DefaultKafkaConsumerFactory<>(config);

    }


    private RetryTemplate retryTemplate() {

        RetryTemplate retryTemplate = new RetryTemplate();


        /*

         * here retry policy is used to set the number of attempts to retry and what

         * exceptions you wanted to try and what you don't want to retry.

         */

        retryTemplate.setRetryPolicy(retryPolicy());


        return retryTemplate;

    }


    private SimpleRetryPolicy retryPolicy() {

        Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();


        // the boolean value in the map determines whether exception should be retried

        exceptionMap.put(IllegalArgumentException.class, false);

        exceptionMap.put(TimeoutException.class, true);

        exceptionMap.put(ListenerExecutionFailedException.class, true);


        return new SimpleRetryPolicy(3, exceptionMap, true);

    }

}


查看完整回答
反对 回复 2022-11-02
?
慕桂英546537

TA贡献1848条经验 获得超10个赞

如果分区名称具有“-”之类的字符,上述答案可能会出现问题。所以,我用正则表达式修改了相同的逻辑。


    import java.util.List;

    import java.util.regex.Matcher;

    import java.util.regex.Pattern;

    

    import org.apache.kafka.clients.consumer.Consumer;

    import org.apache.kafka.clients.consumer.ConsumerRecord;

    import org.apache.kafka.common.TopicPartition;

    import org.apache.kafka.common.errors.SerializationException;

    import org.springframework.kafka.listener.ErrorHandler;

    import org.springframework.kafka.listener.MessageListenerContainer;

    

    import lombok.extern.slf4j.Slf4j;

    

    @Slf4j

    public class KafkaErrHandler implements ErrorHandler {

    

        /**

         * Method prevents serialization error freeze

         * 

         * @param e

         * @param consumer

         */

        private void seekSerializeException(Exception e, Consumer<?, ?> consumer) {

            String p = ".*partition (.*) at offset ([0-9]*).*";

            Pattern r = Pattern.compile(p);

    

            Matcher m = r.matcher(e.getMessage());

    

            if (m.find()) {

                int idx = m.group(1).lastIndexOf("-");

                String topics = m.group(1).substring(0, idx);

                int partition = Integer.parseInt(m.group(1).substring(idx));

                int offset = Integer.parseInt(m.group(2));

    

                TopicPartition topicPartition = new TopicPartition(topics, partition);

    

                consumer.seek(topicPartition, (offset + 1));

    

                log.info("Skipped message with offset {} from partition {}", offset, partition);

            }

        }

    

        @Override

        public void handle(Exception e, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer) {

            log.error("Error in process with Exception {} and the record is {}", e, record);

    

            if (e instanceof SerializationException)

                seekSerializeException(e, consumer);

        }

    

        @Override

        public void handle(Exception e, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,

                MessageListenerContainer container) {

            log.error("Error in process with Exception {} and the records are {}", e, records);

    

            if (e instanceof SerializationException)

                seekSerializeException(e, consumer);

    

        }

    

        @Override

        public void handle(Exception e, ConsumerRecord<?, ?> record) {

            log.error("Error in process with Exception {} and the record is {}", e, record);

        }

    

    } 

最后在配置中使用错误处理程序。


 @Bean

public ConcurrentKafkaListenerContainerFactory<String, GenericType> macdStatusListenerFactory() {


    ConcurrentKafkaListenerContainerFactory<String, GenericType> factory = new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(macdStatusConsumerFactory());

    factory.setRetryTemplate(retryTemplate());

    factory.setErrorHandler(new KafkaErrHandler());


    return factory;

}

但是不推荐解析错误字符串来获取分区、主题和偏移量。如果有人有更好的解决方案,请在此处发布。


查看完整回答
反对 回复 2022-11-02
?
Smart猫小萌

TA贡献1911条经验 获得超7个赞

在我的工厂中,我添加了 commonErrorHander


factory.setCommonErrorHandler(new KafkaMessageErrorHandler());

并KafkaMessageErrorHandler创建如下


class KafkaMessageErrorHandler implements CommonErrorHandler {


    @Override

    public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {

        manageException(thrownException, consumer);

    }


    @Override

    public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {

        manageException(thrownException, consumer);

    }


    private void manageException(Exception ex, Consumer<?, ?> consumer) {

        log.error("Error polling message: " + ex.getMessage());

        if (ex instanceof RecordDeserializationException) {

            RecordDeserializationException rde = (RecordDeserializationException) ex;

            consumer.seek(rde.topicPartition(), rde.offset() + 1L);

            consumer.commitSync();

        } else {

            log.error("Exception not handled");

        }

    }

}


查看完整回答
反对 回复 2022-11-02
  • 4 回答
  • 0 关注
  • 131 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信