为了账号安全,请及时绑定邮箱和手机立即绑定

何时使用 ConcurrentKafkaListenerContainerFactory?

何时使用 ConcurrentKafkaListenerContainerFactory?

临摹微笑 2022-07-20 19:26:12
我是 kafka 的新手,我浏览了文档,但我什么都不懂。有人可以解释何时使用该ConcurrentKafkaListenerContainerFactory课程吗?我已经使用了该Kafkaconsumer课程,但我看到ConcurrentKafkaListenerContainerFactory在我当前的项目中使用了该课程。请解释它的用途。
查看完整描述

2 回答

?
翻阅古今

TA贡献1780条经验 获得超5个赞

Kafka 消费者不是线程安全的。所有网络 I/O 都发生在进行调用的应用程序的线程中。确保多线程访问正确同步是用户的责任。非同步访问将导致ConcurrentModificationException.


如果消费者被分配了多个分区来获取数据,它将尝试同时从所有分区中消费,从而有效地为这些分区提供相同的消费优先级。但是,在某些情况下,消费者可能希望首先专注于从分配的分区的某个子集全速获取,并且仅在这些分区几乎没有或没有数据要消耗时才开始获取其他分区。


春卡夫卡


ConcurrentKafkaListenerContainerFactory用于为带注释的方法创建容器@KafkaListener


MessageListenerContainer春天卡夫卡有两个


KafkaMessageListenerContainer

ConcurrentMessageListenerContainer

KafkaMessageListenerContainer接收来自单个线程上所有主题或分区的所有消息。ConcurrentMessageListenerContainer委托给一个或多个实例KafkaMessageListenerContainer以提供多线程消费。


使用 ConcurrentMessageListenerContainer


@Bean

KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>

                    kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =

                            new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory());

    factory.setConcurrency(3);

    factory.getContainerProperties().setPollTimeout(3000);

    return factory;

  }

它具有并发属性。例如, container.setConcurrency(3) 创建了三个KafkaMessageListenerContainer实例。


如果TopicPartition提供了6个实例,并发为3;每个容器有两个分区。对于五个 TopicPartition 实例,两个容器获得两个分区,第三个获得一个。如果并发大于 TopicPartition 的数量,则将并发调低,使每个容器获得一个分区。


查看完整回答
反对 回复 2022-07-20
?
梵蒂冈之花

TA贡献1900条经验 获得超5个赞

Kafka Consumer API 不是线程安全的。ConcurrentKafkaListenerContainerFactory api 提供了使用 Kafka Consumer API 的并发方式以及设置其他 kafka 消费者属性。



查看完整回答
反对 回复 2022-07-20
  • 2 回答
  • 0 关注
  • 480 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信