Spring 引导中的 Kafka 配置类找不到密钥库或信任库



我正在设置 Kafka 使用者配置,但配置在类路径上找不到密钥库或信任库:

@EnableKafka
@Configuration
public class KafkaConfig {
@Value("${kafka.ssl.keystore}")
private String keyStorePath;
@Value("${kafka.ssl.truststore}")
private String trustStorePath;
@Bean
public ConsumerFactory<String, String> getConsumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"my-bootstrap.mydomain.com:443");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "client1");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "500");
properties.put("session.timeout.ms", "30000");
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStorePath);
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password");
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStorePath);
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password");
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(getConsumerFactory());
return factory;
}
}

密钥库和信任库都位于目录中,src/main/resources/ssl与配置类位于同一 maven 模块中。

我在 application.yml 中设置了占位符,如下所示:

kafka:
ssl:
keystore: classpath:ssl/kafka-keystore.jks
truststore: classpath:ssl/kafka-truststore.jks

但是,应用程序无法启动,出现以下异常:

"org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: classpath:ssl/kafka-keystore.jks (No such file or directory)"

我的理解是,使用@Value可以使用classpath:前缀来解析类路径(请参阅此链接( https://www.baeldung.com/spring-classpath-file-access

此外,@Value技术可以很好地解析同一应用程序中反应式 WebClient 配置的密钥库和信任库。

我需要做什么来解析 Kafka 配置的类路径?我在这里错过了什么吗?

你注入到一个字符串中,它将把"类路径:"保留在字符串值中,并将其作为属性提供给 DefaultKafkaConsumerFactory,尝试注入到一个 spring 资源中,如下所示:

import org.springframework.core.io.Resource;
@Value("classpath:path/to/file/in/classpath")
Resource resourceFile;

然后你可以访问该文件,你可以得到绝对路径,如:

resourceFile.getFile().getAbsolutePath()

这个想法是你可以提供DefaultKafkaConsumerFactory的绝对路径。

但是你也可以尝试删除"classpath:"并像当前代码一样注入为String,这取决于DefaultKafkaConsumerFactory如何处理该属性。 但我不明白为什么上面的绝对路径不起作用。

对于那些像我这样使用 Spring Boot 和 Spring Kafka 并且不覆盖 DefaultKafkaConsumerFactory 并且只使用属性进行配置的人,有一个 BeanPostProcessor 类可以实现。它提供两种方法:

postProcessAfterInitializationpostProcessBeforeInitialization

工厂钩子,允许自定义修改新的 Bean 实例 — 例如,检查标记接口或使用代理包装 bean。 通常,通过标记接口等填充 bean 的后处理器将实现 postProcessBeforeInitialization(java.lang.Object, java.lang.String(,而用代理包装 bean 的后处理器通常会实现 postProcessAfterInitialization(java.lang.Object, java.lang.String(。

我正在使用Spring Boot和Spring Kafka,我只想更改本地配置文件。

在我的代码示例中,我使用它来覆盖 Kafka 位置属性,因为对于 SSL,它不会从类路径读取。

所以这是代码:

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import java.io.IOException;
import java.util.Arrays;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
@Configuration
@RequiredArgsConstructor
public class KafkaConfiguration implements BeanPostProcessor {
@Value("${spring.kafka.ssl.key-store-location:}")
private Resource keyStoreResource;
@Value("${spring.kafka.properties.schema.registry.ssl.truststore.location:}")
private Resource trustStoreResource;
private final Environment environment;
@SneakyThrows
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof KafkaProperties) {
KafkaProperties kafkaProperties = (KafkaProperties) bean;
if(isLocalProfileActive()) {
configureStoreLocation(kafkaProperties);
}
}
return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
}
private boolean isLocalProfileActive() {
return Arrays.stream(environment.getActiveProfiles()).anyMatch(profile -> "local".equals(profile));
}
private void configureStoreLocation(KafkaProperties kafkaProperties) throws IOException {
kafkaProperties.getSsl().setKeyStoreLocation(new FileSystemResource(keyStoreResource.getFile().getAbsolutePath()));
kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreResource.getFile().getAbsolutePath());
kafkaProperties.getSsl().setTrustStoreLocation(new FileSystemResource(trustStoreResource.getFile().getAbsolutePath()));
kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreResource.getFile().getAbsolutePath());
}
}

这样我就可以在我的属性文件中拥有:

spring.kafka.ssl.key-store-location=classpath:mykeystore.jks

代码将从中获取绝对路径并设置它。它还可以根据配置文件进行过滤。

值得一提的是,BeanPostProcessor 为每个 bean 运行,因此请确保过滤所需的内容。

最新更新