为了账号安全,请及时绑定邮箱和手机立即绑定

Spring Boot 中的 Kafka 配置类未找到密钥库或信任库

Spring Boot 中的 Kafka 配置类未找到密钥库或信任库

哈士奇WWW 2023-07-28 10:16:58
我正在设置 Kafka 消费者配置,但该配置在类路径上找不到密钥库或信任库:@EnableKafka@Configurationpublic class KafkaConfig {    @Value("${kafka.ssl.keystore}")    private String keyStorePath;    @Value("${kafka.ssl.truststore}")    private String trustStorePath;    @Bean    public ConsumerFactory<String, String> getConsumerFactory() {        Map<String, Object> properties = new HashMap<>();        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"my-bootstrap.mydomain.com:443");        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "client1");        properties.put("enable.auto.commit", "true");        properties.put("auto.commit.interval.ms", "500");        properties.put("session.timeout.ms", "30000");        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");        properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStorePath);        properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password");        properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStorePath);        properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");        properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password");        return new DefaultKafkaConsumerFactory<>(properties);    }src/main/resources/ssl密钥库和信任库都位于与配置类相同的 Maven 模块中的目录中。我在 application.yml 中设置了占位符,如下所示:kafka:  ssl:    keystore: classpath:ssl/kafka-keystore.jks    truststore: classpath:ssl/kafka-truststore.jks但是,应用程序无法启动,并出现以下异常:"org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: classpath:ssl/kafka-keystore.jks (No such file or directory)"我的理解是,使用@Value可以使用classpath:前缀来解析类路径(请参阅此链接) https://www.baeldung.com/spring-classpath-file-access此外,该@Value技术可以很好地解析同一应用程序中反应式 WebClient 配置的密钥库和信任库。我需要做什么来解析 Kafka 配置的类路径?我在这里错过了什么吗?
查看完整描述

2 回答

?
Qyouu

TA贡献1786条经验 获得超11个赞

您注入一个字符串,它将保留“classpath:”在字符串值中并将其作为属性提供给 DefaultKafkaConsumerFactory,尝试注入到 spring 资源中,例如:


import org.springframework.core.io.Resource;


@Value("classpath:path/to/file/in/classpath")

Resource resourceFile;

然后你可以访问该文件,你可以获得绝对路径,如下所示:


resourceFile.getFile().getAbsolutePath()


这个想法是你可以提供 DefaultKafkaConsumerFactory 的绝对路径


但是您也可以尝试删除“classpath:”并像当前代码一样注入为 String ,这可能取决于 DefaultKafkaConsumerFactory 如何处理该属性。但我不明白为什么上面的绝对路径不起作用。


查看完整回答
反对 回复 2023-07-28
?
GCT1015

TA贡献1827条经验 获得超4个赞

对于像我这样使用 Spring Boot 和 Spring Kafka 并且不重写 DefaultKafkaConsumerFactory 的人(仅使用属性进行配置),您可以实现一个BeanPostProcessor类。它提供了两种方法:

postProcessAfterInitializationpostProcessBeforeInitialization

工厂钩子允许对新 bean 实例进行自定义修改 - 例如,检查标记接口或使用代理包装 bean。通常,通过标记接口等填充 Bean 的后处理器将实现 postProcessBeforeInitialization(java.lang.Object, java.lang.String),而用代理包装 Bean 的后处理器通常将实现 postProcessAfterInitialization(java.lang.Object) ,java.lang.String)。

我将 Spring Boot 与 Spring Kafka 一起使用,我只想更改本地配置文件。

在我的代码示例中,我使用它来覆盖 Kafka Location 属性,因为对于 SSL,它不会从类路径读取。

这就是代码:

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;

import java.io.IOException;

import java.util.Arrays;

import lombok.RequiredArgsConstructor;

import lombok.SneakyThrows;

import org.apache.kafka.common.config.SslConfigs;

import org.springframework.beans.BeansException;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.beans.factory.config.BeanPostProcessor;

import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

import org.springframework.context.annotation.Configuration;

import org.springframework.core.env.Environment;

import org.springframework.core.io.FileSystemResource;

import org.springframework.core.io.Resource;


@Configuration

@RequiredArgsConstructor

public class KafkaConfiguration implements BeanPostProcessor {


  @Value("${spring.kafka.ssl.key-store-location:}")

  private Resource keyStoreResource;

  @Value("${spring.kafka.properties.schema.registry.ssl.truststore.location:}")

  private Resource trustStoreResource;

  private final Environment environment;


  @SneakyThrows

  @Override

  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

    if (bean instanceof KafkaProperties) {

      KafkaProperties kafkaProperties = (KafkaProperties) bean;

      if(isLocalProfileActive()) {

        configureStoreLocation(kafkaProperties);

      }

    }

    return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);

  }


  private boolean isLocalProfileActive() {

    return Arrays.stream(environment.getActiveProfiles()).anyMatch(profile -> "local".equals(profile));

  }


  private void configureStoreLocation(KafkaProperties kafkaProperties) throws IOException {

    kafkaProperties.getSsl().setKeyStoreLocation(new FileSystemResource(keyStoreResource.getFile().getAbsolutePath()));

    kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreResource.getFile().getAbsolutePath());

    kafkaProperties.getSsl().setTrustStoreLocation(new FileSystemResource(trustStoreResource.getFile().getAbsolutePath()));

    kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreResource.getFile().getAbsolutePath());

  }


}

这样我就可以在我的属性文件中添加:


spring.kafka.ssl.key-store-location=classpath:mykeystore.jks


代码将从中获取绝对路径并设置它。它还可以根据配置文件进行过滤。


值得一提的是,BeanPostProcessor 会针对每个bean 运行,因此请确保您过滤了您想要的内容。


查看完整回答
反对 回复 2023-07-28
  • 2 回答
  • 0 关注
  • 194 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信