为了账号安全,请及时绑定邮箱和手机立即绑定

重新加载 Spring Kafka 在运行时使用的 SSL 上下文

重新加载 Spring Kafka 在运行时使用的 SSL 上下文

沧海一幻觉 2022-07-20 20:15:16
使 Spring Kafka 重新加载 SSL 上下文的推荐方法是什么?我需要在不停机的情况下将新证书插入到我的 Kafka 生产者使用的信任库中。但是我发现,一旦应用程序启动并创建了 Kafka 生产者,就会创建并缓存 SSLContext 的实例。有一种方法可以重新配置它,但到目前为止我发现的唯一方法是通过调用 destroy 方法DefaultKafkaProducerFactory(在证书更新后)销毁任何现有的生产者,这会导致任何后续调用KafkaTemplate.send强制创建一个新的生产者,这反过来重新加载 SSL 上下文。我觉得这就像用大锤来解决这个问题,可能会有更优雅的解决方案。我还注意到,如果在发送消息时调用destroy,我会得到以下异常,当我们无法承受丢失任何事件时,它看起来不是很积极。java.util.concurrent.CompletionException: org.apache.kafka.common.KafkaException: Producer closed while send in progress    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)    at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1592)    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)    at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)Caused by: org.apache.kafka.common.KafkaException: Producer closed while send in progress    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:826)    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:372)    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:190)    at org.springframework.kafka.core.KafkaOperations$send.call(Unknown Source)    at com.example.event.publisher.kafka.KafkaEventPublisher.doPublish(KafkaEventPublisher.groovy:57)
查看完整描述

2 回答

?
开满天机

TA贡献1786条经验 获得超13个赞

Spring Kafka 2.6.5、Kafka 2.4.1,我改用 KafkaProducerFactory.reset()。


@Autowired

private final KafkaTemplate<String, byte[]> kafkaTemplate;



private void reloadProducer() {

    kafkaTemplate.getProducerFactory().reset();

}

下次调用 send() 时,Spring 将使用新证书重新创建一个新的 KafkaConsumer 品牌。


查看完整回答
反对 回复 2022-07-20
?
眼眸繁星

TA贡献1873条经验 获得超9个赞

看起来简单地重置某些值的配置将触发 SSL 引擎工厂的重建。他们甚至会调出会导致热重载的文件密钥配置。



查看完整回答
反对 回复 2022-07-20
  • 2 回答
  • 0 关注
  • 107 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号