2 回答
TA贡献1780条经验 获得超5个赞
Kafka 消费者不是线程安全的。所有网络 I/O 都发生在进行调用的应用程序的线程中。确保多线程访问正确同步是用户的责任。非同步访问将导致ConcurrentModificationException.
如果消费者被分配了多个分区来获取数据,它将尝试同时从所有分区中消费,从而有效地为这些分区提供相同的消费优先级。但是,在某些情况下,消费者可能希望首先专注于从分配的分区的某个子集全速获取,并且仅在这些分区几乎没有或没有数据要消耗时才开始获取其他分区。
春卡夫卡
ConcurrentKafkaListenerContainerFactory用于为带注释的方法创建容器@KafkaListener
MessageListenerContainer春天卡夫卡有两个
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer接收来自单个线程上所有主题或分区的所有消息。ConcurrentMessageListenerContainer委托给一个或多个实例KafkaMessageListenerContainer以提供多线程消费。
使用 ConcurrentMessageListenerContainer
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
它具有并发属性。例如, container.setConcurrency(3) 创建了三个KafkaMessageListenerContainer实例。
如果TopicPartition提供了6个实例,并发为3;每个容器有两个分区。对于五个 TopicPartition 实例,两个容器获得两个分区,第三个获得一个。如果并发大于 TopicPartition 的数量,则将并发调低,使每个容器获得一个分区。
TA贡献1900条经验 获得超5个赞
Kafka Consumer API 不是线程安全的。ConcurrentKafkaListenerContainerFactory api 提供了使用 Kafka Consumer API 的并发方式以及设置其他 kafka 消费者属性。
添加回答
举报