如何将示例kafka生产者应用程序连接到远程kafka服务器



https://www.javainuse.com/spring/spring-boot-apache-kafka-hello-world

我遵循这个例子构建了一个简单的Kafka生产者进行一些测试。然而,我的Kafka实例不是本地的,由我公司的基础设施团队管理,所以它在远程服务器上运行。我有它运行的地址和端口,但我不确定如何配置我的应用程序以连接到它。我需要使用application.properties文件来定义kafka地址池或类似的东西吗?

spring.kafka.bootstrap-servers=[kafka-server-1:port],[kafka-server-2:port]
spring.kafka.consumer.group-id=my-sample-group
spring.kafka.security.protocol=SSL

在我为消费者提供的教程中,我的application.properties中有这样的内容,但我不确定它对生产者来说是相同的还是应该不同。这里的第二个值似乎是特定于消费者的,基于IDE中的自动完成,我没有看到生产者有类似的值。

感谢提供的任何帮助

我已经更改了代码,并在https://docs.spring.io/spring-kafka/reference/html/#spring-引导生产者应用

现在,当我尝试连接时,输出只是在关于我的连接池中的2个kafka服务器的WARN消息之间交替:

2021-11-10 11:44:29.609 WARN 11839 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node -2 (kafka1.mycompany.com/[some-ip-address]:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.

2021-11-10 11:44:29.798 WARN 11839 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node -1 (kafka2.mycompany.com/[some-ip-address]:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.

一开始,它似乎输出了一些关于卡夫卡设置的信息。也许那里有一些信息,关于我的application.properties中可能需要哪些附加参数才能进行实际连接。

bootstrap.servers = [kafka1.mycompany.com:9092, kafka2.mycompany.com:9092]
client.dns.lookup = use_all_dns_ips
client.id = 
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

您遵循的教程已经非常过时了。我建议查看官方文档:https://docs.spring.io/spring-kafka/reference/html/#spring-启动生产者应用程序。根据您的设置,配置可能只是一个引导服务器。

spring.kafka.bootstrap-servers=kafka.your-company.com:9092

我想问Kafka-ops团队如何连接,因为jaas和ssl可能会使连接变得复杂。

Spring Boot Kafka配置设置可以在这里找到:https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html#application-properties.integration.spring.kafka.admin.client-id

spring.kafka.bootstrap-servers对于消费者和生产者(以及管理客户端(都是相同的属性,是的。

我需要使用application.properties文件吗

SpringBoot应用程序不需要它

相关内容

  • 没有找到相关文章

最新更新