2 回答
TA贡献1856条经验 获得超5个赞
我认为你的问题只是 WriterConfig。
例如,如果您的配置类似于segmentio/kafka-go 文档上的示例:
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
Balancer: &kafka.LeastBytes{},
})
您可以尝试设置批量大小和批量超时:
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
Balancer: &kafka.LeastBytes{},
BatchSize: 1,
BatchTimeout: 10 * time.Millisecond,
})
发生这种情况是因为 kafka-go 默认等待 1 秒,直到批次达到最大大小(默认为 100 条消息),正如我们在代码中看到的。
希望对您有帮助。
更新:请注意,一条一条地发送消息会减慢该过程。例如:批量发送100条消息在我的电脑上花费了0.0107s。一条一条发送相同的 100 条消息花费了 0.0244 秒。
TA贡献1834条经验 获得超8个赞
我对golang了解不多。但以下使用Writer.WriteMessages 的函数具有同步发送选项。
写入速度快(使用同步发送)实际上取决于您的网络往返时间,即将消息发送到 Kafka 所需的时间加上从 Kafka 获取确认所需的时间。
如果您使用同步发送,那么您的发送将被阻塞,直到收到确认为止。因此,为了加快速度,一种方法是减少确认。最好将其设置为 1(这意味着领导者已将消息写入其日志,但不会复制到追随者)。但如果领导者宕机并且消息没有被复制,这可能会导致损失。
因此,您可以将其设置为acks=all
并更改min.insync.replicas=2
主题。值越小,send()
返回的速度就越快,并且将下一条消息推送到 Kafka 的速度也就越快。
- 2 回答
- 0 关注
- 144 浏览
添加回答
举报