2 回答
TA贡献1786条经验 获得超11个赞
您注入一个字符串,它将保留“classpath:”在字符串值中并将其作为属性提供给 DefaultKafkaConsumerFactory,尝试注入到 spring 资源中,例如:
import org.springframework.core.io.Resource;
@Value("classpath:path/to/file/in/classpath")
Resource resourceFile;
然后你可以访问该文件,你可以获得绝对路径,如下所示:
resourceFile.getFile().getAbsolutePath()
这个想法是你可以提供 DefaultKafkaConsumerFactory 的绝对路径
但是您也可以尝试删除“classpath:”并像当前代码一样注入为 String ,这可能取决于 DefaultKafkaConsumerFactory 如何处理该属性。但我不明白为什么上面的绝对路径不起作用。
TA贡献1827条经验 获得超4个赞
对于像我这样使用 Spring Boot 和 Spring Kafka 并且不重写 DefaultKafkaConsumerFactory 的人(仅使用属性进行配置),您可以实现一个BeanPostProcessor类。它提供了两种方法:
postProcessAfterInitialization
和postProcessBeforeInitialization
工厂钩子允许对新 bean 实例进行自定义修改 - 例如,检查标记接口或使用代理包装 bean。通常,通过标记接口等填充 Bean 的后处理器将实现 postProcessBeforeInitialization(java.lang.Object, java.lang.String),而用代理包装 Bean 的后处理器通常将实现 postProcessAfterInitialization(java.lang.Object) ,java.lang.String)。
我将 Spring Boot 与 Spring Kafka 一起使用,我只想更改本地配置文件。
在我的代码示例中,我使用它来覆盖 Kafka Location 属性,因为对于 SSL,它不会从类路径读取。
这就是代码:
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import java.io.IOException;
import java.util.Arrays;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
@Configuration
@RequiredArgsConstructor
public class KafkaConfiguration implements BeanPostProcessor {
@Value("${spring.kafka.ssl.key-store-location:}")
private Resource keyStoreResource;
@Value("${spring.kafka.properties.schema.registry.ssl.truststore.location:}")
private Resource trustStoreResource;
private final Environment environment;
@SneakyThrows
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof KafkaProperties) {
KafkaProperties kafkaProperties = (KafkaProperties) bean;
if(isLocalProfileActive()) {
configureStoreLocation(kafkaProperties);
}
}
return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
}
private boolean isLocalProfileActive() {
return Arrays.stream(environment.getActiveProfiles()).anyMatch(profile -> "local".equals(profile));
}
private void configureStoreLocation(KafkaProperties kafkaProperties) throws IOException {
kafkaProperties.getSsl().setKeyStoreLocation(new FileSystemResource(keyStoreResource.getFile().getAbsolutePath()));
kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreResource.getFile().getAbsolutePath());
kafkaProperties.getSsl().setTrustStoreLocation(new FileSystemResource(trustStoreResource.getFile().getAbsolutePath()));
kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreResource.getFile().getAbsolutePath());
}
}
这样我就可以在我的属性文件中添加:
spring.kafka.ssl.key-store-location=classpath:mykeystore.jks
代码将从中获取绝对路径并设置它。它还可以根据配置文件进行过滤。
值得一提的是,BeanPostProcessor 会针对每个bean 运行,因此请确保您过滤了您想要的内容。
添加回答
举报