为了账号安全,请及时绑定邮箱和手机立即绑定

某些记录的 Azure eventhub Kafka org.apache.kafka.

某些记录的 Azure eventhub Kafka org.apache.kafka.

梵蒂冈之花 2023-08-09 15:15:06
有一个包含 80 到 100 条记录的 ArrayList,尝试流式传输并将每个单独的记录(POJO,不是整个列表)发送到 Kafka 主题(事件中心)。安排一个 cron 作业,例如每小时将这些记录 (POJO) 发送到事件中心。能够看到消息发送到 eventhub ,但在 3 到 4 次成功运行后出现以下异常(其中包括多条正在发送的消息和几条因以下异常而失败的消息)    Expiring 14 record(s) for eventhubname: 30125  ms has passed since batch creation plus linger time以下是使用的 Producer 的配置,    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);    props.put(ProducerConfig.ACKS_CONFIG, "1");    props.put(ProducerConfig.RETRIES_CONFIG, "3");消息保留期 - 7 分区 - 6 使用 spring Kafka(2.2.3) 发送标记为@Asynckafka send 写入位置的事件方法    @Async    protected void send() {       kafkatemplate.send(record);    }预期 - kafka 不会抛出异常 实际 - 抛出 org.apache.kafka.common.errors.TimeoutException
查看完整描述

2 回答

?
万千封印

TA贡献1891条经验 获得超3个赞

Prakash - 我们已经看到了许多问题,其中尖峰生产者模式会出现批处理超时。

这里的问题是生产者有两个 TCP 连接可以空闲超过 4 分钟 - 此时,Azure 负载均衡器会关闭空闲连接。Kafka 客户端不知道连接已关闭,因此它尝试在失效连接上发送批处理,该连接超时,此时将开始重试。

  • 将connections.max.idle.ms设置为<4分钟——这允许Kafka客户端的网络客户端层优雅地处理生产者消息发送TCP连接的连接关闭

  • 将metadata.max.age.ms设置为<4分钟——这实际上是生产者元数据TCP连接的保持活动状态

查看完整回答
反对 回复 2023-08-09
?
长风秋雁

TA贡献1757条经验 获得超7个赞

此异常表明您对记录进行排队的速度快于发送记录的速度。将记录添加到批次后,发送该批次就有一个时间限制,以确保它已在指定的持续时间内发送。这是由 Producer 配置参数 request.timeout.ms 控制的。如果批次排队时间超过超时限制,则会抛出异常。该批次中的记录将从发送队列中删除。


查看完整回答
反对 回复 2023-08-09
  • 2 回答
  • 0 关注
  • 116 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信