1 回答
TA贡献1786条经验 获得超13个赞
为了解决这个问题,2.2 版引入了ErrorHandlingDeserializer2. 此反序列化器委托给真正的反序列化器(键或值)。如果委托未能反序列化记录内容,则ErrorHandlingDeserializer2返回一个空值和一个DeserializationException包含原因和原始字节的标头。当您使用记录级别MessageListener时,如果 ConsumerRecord 包含DeserializationException键或值的标头,则会使用失败的 ConsumerRecord 调用容器的 ErrorHandler。记录不会传递给侦听器。
但是,由于您使用的是早期版本,因此可以使用以下 hack。
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
thrownException.printStackTrace();
if (thrownException instanceOf SerializationException){
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
consumer.seek(topicPartition, offset + 1);
}
}
添加回答
举报