为了账号安全,请及时绑定邮箱和手机立即绑定

在回调中发送记录时如何在spring-kafka中修复“在xxx ms后更新元数据失败”

在回调中发送记录时如何在spring-kafka中修复“在xxx ms后更新元数据失败”

幕布斯6054654 2022-06-04 14:55:23
spring-kafka 无法在回调中发送记录    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, data);    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {        @Override        public void onFailure(Throwable ex) {            log.error("log error...");        }        @Override        public void onSuccess(SendResult<String, String> result) {            kafkaTemplate.send("anotherTopic", "key", "data");        }    });当我在 onSuccess() 中调用 kafkaTemplate.send() 时,Kafka 抛出“更新元数据失败”,这是意料之中的
查看完整描述

1 回答

?
莫回无

TA贡献1865条经验 获得超7个赞

看起来您无法在回调线程上执行生产者操作kafka-producer-network-thread- 可能是生产者代码中的一些死锁 - 等待获取将使用同一线程的元数据,因此它超时。


您可能需要第二个KafkaTemaplate(和生产者工厂,因为默认工厂总是返回相同的生产者)。


或者只是在不同的线程上执行第二次发送......


@SpringBootApplication

public class So54492871Application {


    private static final ExecutorService exec = Executors.newSingleThreadExecutor();


    public static void main(String[] args) {

        SpringApplication.run(So54492871Application.class, args);

    }


    @Bean

    public NewTopic topic1() {

        return new NewTopic("so54492871-1", 1, (short) 1);

    }


    @Bean

    public NewTopic topic2() {

        return new NewTopic("so54492871-2", 1, (short) 1);

    }


    @Bean

    public ApplicationRunner runner(KafkaTemplate<String, String> template) {

        return args -> {

            ListenableFuture<SendResult<String, String>> future = template.send("so54492871-1", "foo");

            future.addCallback(result -> {

                System.out.println(Thread.currentThread().getName() + ":" + result);

                exec.execute(() -> {

                    ListenableFuture<SendResult<String, String>> future2 = template.send("so54492871-2", "bar");

                    future2.addCallback(result2 -> {

                        System.out.println(Thread.currentThread().getName() + ":" + result2);

                    }, ex -> {

                        System.out.println(ex.getMessage());

                    });

                });

            }, ex -> {

                System.out.println(ex.getMessage());

            });

            System.in.read();

        };

    }


}


查看完整回答
反对 回复 2022-06-04
  • 1 回答
  • 0 关注
  • 98 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信