为了账号安全,请及时绑定邮箱和手机立即绑定

Kafka - 停止重试 ConnectException

Kafka - 停止重试 ConnectException

慕姐8265434 2021-09-15 17:03:52
我有一个 kafka 生产者定义如下public KafkaMessageProducer(String kafkaHost, String kafkaPort, Map<String, String> map) {        this.kafkaTopic = map;        final Properties properties = new Properties();        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        properties.put("bootstrap.servers", kafkaHost + ":" + kafkaPort);        producer = new KafkaProducer<String, String>(properties);    }我正在使用以下代码发送消息。(也尝试使用回调)。public void sendMessage(String topic, RestCommonResource resultToken) {        ObjectMapper objectMapper = new ObjectMapper();        JsonNode  jsonNode = objectMapper.valueToTree(resultToken);        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, jsonNode.toString());        producer.send(record);    }但是如果 kafka 服务器关闭并且生产者发布消息,程序将陷入无限循环,并出现以下异常:WARN  [2018-09-13 06:27:59,589] org.apache.kafka.common.network.Selector: Error in I/O with localhost/127.0.0.1! java.net.ConnectException: Connection refused: no further information! at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_80]! at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744) ~[na:1.7.0_80]! at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na]! at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na]! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na]! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na]! at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80]如果有任何属性可以设置为停止重试并删除消息。
查看完整描述

3 回答

?
眼眸繁星

TA贡献1873条经验 获得超9个赞

目前,如果 Kafka 客户端失去与代理的连接,它将在尝试重新连接之前等待reconnect.backoff.ms毫秒。

虽然这种策略在客户端短时间断开连接时很有效,但如果单个代理或整个集群长时间不可用,所有客户端将迅速生成大量连接。

此外,开发人员对不断失去与集群连接的客户端的控制有限。

我认为这个主题对您有用:添加自定义策略以重新连接到 NetworkdClient 的尝试

reconnect.backoff.ms :尝试重新连接到给定主机之前等待的基本时间。这避免了在紧密循环中重复连接到主机。此退避适用于客户端到代理的所有连接尝试。

reconnect.backoff.max.ms:重新连接到反复连接失败的代理时等待的最长时间(以毫秒为单位)。如果提供,则每个主机的退避将在每次连续连接失败时呈指数增加,直至达到此最大值。计算退避增量后,添加 20% 的随机抖动以避免连接风暴。


查看完整回答
反对 回复 2021-09-15
?
幕布斯6054654

TA贡献1876条经验 获得超7个赞

您还需要包含以下 Producer 属性

props.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "10000");

使用reconnect.backoff.msWARNing 只会出现一次。

关于Kafka 文档

reconnect.backoff.ms
在尝试重新连接到给定主机之前等待的基本时间。这避免了在紧密循环中重复连接到主机。此退避适用于客户端到代理的所有连接尝试。


查看完整回答
反对 回复 2021-09-15
  • 3 回答
  • 0 关注
  • 429 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信