Spring Cloud Stream Kafka Streams无法为我的消费者和生产者配置SSL



我很难为Kafka Streams正确配置Spring Cloud Stream,使其使用带有信任存储和密钥存储的SSL。

在我的应用程序中,我有多个Streams在运行,它们的SSL配置应该是相同的。

应用程序如下所示:

流1:主题1>主题2

流2:主题2>主题4主题3

流3:主题4>Topic5

我在Kafka Streams和Avro Models中使用了最新的Spring Cloud Stream框架。我可以配置架构注册表。

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>

我的application.yaml文件如下所示:

spring.application.name: processingapp
spring.cloud:
function.definition: stream1;stream2;stream3
stream:
bindings:
stream1-in-0:
destination: topic1
stream1-out-0:
destination: topic2
stream2-in-0:
destination: topic2
stream2-in-1:
destination: topic3
stream2-out-0:
destination: topic4
stream3-in-0:
destination: topic4
stream3-out-0:
destination: topic5
kafka:
binder:
brokers: kafkabrokerurl.com:9092
configuration: # not recognized at all
security.protocol: SSL
ssl.truststore.location: /mnt/truststore.jks
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/keystore.jks
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
bindings:
default:
consumer:
resetOffsets: false
startOffset: latest
stream1-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream1-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream2-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream2-in-1:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
materializedAs: sbinfo-outage-mapping-store
stream2-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream3-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream3-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
streams:
binder:
configuration:
schema.registry.url: https://schemaregistryurl.com # this works

当我在启用调试日志的情况下启动应用程序时,它显示除了模式注册表之外,它不会加载我设置的配置。

o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
bootstrap.servers = [kafkabrokerurl.com:9092]
client.dns.lookup = use_all_dns_ips
client.id = 
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

因此,代理程序将被正确加载,但例如truststore.location仍然为null。

我在这里和其他地方尝试了很多不同的方法。

我在这里发现了一个老问题,并尝试了这种方法,但结果是一样的:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/129

configuration: # not recognized at all
"[security.protocol]": SSL
"[ssl.truststore.location]": /mnt/truststore.jks
"[ssl.truststore.type]": JKS
"[ssl.keystore.location]": /mnt/keystore.jks
"[ssl.keystore.type]": JKS
"[ssl.enabled.protocols]": TLSv1.2

我读到在使用多个绑定器时配置不起作用,所以我也尝试了定义绑定器名称的方法,但它抱怨它无法识别它

spring.application.name: processingapp
spring.cloud:
function.definition: stream1;stream2;stream3
stream:
bindings:
stream1-in-0:
destination: topic1
binder: ssl
stream1-out-0:
destination: topic2
binder: ssl
stream2-in-0:
destination: topic2
binder: ssl
stream2-in-1:
destination: topic3
binder: ssl
stream2-out-0:
destination: topic4
binder: ssl
stream3-in-0:
destination: topic4
binder: ssl
stream3-out-0:
destination: topic5
binder: ssl
binders:
ssl:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers:
configuration:
security.protocol: SSL
ssl.truststore.location: /mnt/secrets/truststore.jks
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/secrets/keystore.jks
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2

错误:

2021-07-15 17:11:14.634 ERROR 5216 --- [           main] o.s.boot.SpringApplication               : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: Unknown binder configuration: kstream

我有一个@Configuration注释类,其中我的3个流声明为函数、双函数和函数。

我希望有人能帮我-谢谢。

属性名称中缺少streams元素,而是配置Kafka MessageChannel Binder。

spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
security:
protocol: SSL

相关内容

  • 没有找到相关文章

最新更新