为了账号安全,请及时绑定邮箱和手机立即绑定

在 Spring Boot 中控制启用/禁用 Kafka 消费者

在 Spring Boot 中控制启用/禁用 Kafka 消费者

慕田峪9158850 2022-05-25 16:17:14
我在 Spring Boot 中配置了几个 Kafka 消费者。这就是 kafka.properties 的样子(这里只列出一个消费者的配置):kafka.topics=bootstrap.servers=group.id=enable.auto.commit=auto.commit.interval.ms=session.timeout.ms=schema.registry.url=auto.offset.reset=kafka.enabled=这是配置:@Configuration@PropertySource({"classpath:kafka.properties"})public class KafkaConsumerConfig {    @Autowired    private Environment env;    @Bean    public ConsumerFactory<String, String> pindropConsumerFactory() {        Map<String, Object> dataRiverProps = new HashMap<>();        dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));        dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));        dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));        dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));        dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));        dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));        return new DefaultKafkaConsumerFactory<>(dataRiverProps);    }    @Bean    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(pindropConsumerFactory());        return factory;    }}有没有办法让我使用道具“kafka.enabled”,这样我就可以控制这个消费者的创建或者消息检索?非常感谢!
查看完整描述

2 回答

?
手掌心

TA贡献1942条经验 获得超3个赞

您可以通过在消费者中使用属性autoStartup (true/false) 来做到这一点,如下所示 -


@KafkaListener(id = "foo", topics = "Topic1", groupId = "group_id",

        containerFactory = "kafkaListenerContainerFactory",autoStartup = "${listen.auto.start:false}")

public void consume(String message) {

    //System.out.println("Consumed message: " + message);

}


查看完整回答
反对 回复 2022-05-25
?
开心每一天1111

TA贡献1836条经验 获得超13个赞

要禁用 Kafka 配置,您可以,例如:

  1. 用 KafkaConsumerConfig 注释

    @ConditionalOnProperty(value = "kafka.enabled", matchIfMissing = true)

  2. 删除类@Component并将KafkaConsumer其定义为 @Bean in KafkaConsumerConfig

要控制 KafkaConsumer 中的消息检索:

  1. 只需在 KafkaConsumer 中获取属性值@Value("kafka.enabled") private Boolean enabled;

  2. 然后在用 . 注释的方法中使用简单的 if @KafkaListener


查看完整回答
反对 回复 2022-05-25
  • 2 回答
  • 0 关注
  • 1069 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信