Camel kafka组件无法在启用ssl的情况下从主题中读取消息



camel Kafka组件无法在启用ssl的情况下读取消息,并且它也没有给出任何错误,下面是我的路由器

任何人,请帮助我如何解决这类问题,它也不会显示错误/失败日志。

from("kafka:testtopic9?brokers=<domain-name>:9092"
+ "&groupId=test"
+ "&sslKeyPassword=12345"
+ "&sslKeystorePassword=12345"
+ "&securityProtocol=SASL_SSL"
+ "&sslTruststoreLocation=kafka.client.truststore.jks"
+ "&saslMechanism=PLAIN"
+ "&keyDeserializer=org.apache.kafka.common.serialization.IntegerDeserializer"
+ "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule required username="user1" password="user1")
.log("fetching data from broker :: ${body}")

在日志中,它只显示配置属性值,而不显示订阅主题等其他日志。:

<pre>
03-12-2020 12:56:06.871 [main] INFO  o.s.s.c.ThreadPoolTaskExecutor.initialize - Initializing ExecutorService 'applicationTaskExecutor'
03-12-2020 12:56:07.538 [main] INFO  o.a.c.i.c.DefaultTypeConverter.doStart - Type converters loaded (core: 195, classpath: 14)
03-12-2020 12:56:07.899 [main] INFO  o.a.coyote.http11.Http11NioProtocol.log - Starting ProtocolHandler ["http-nio-8080"]
03-12-2020 12:56:07.923 [main] INFO  o.s.b.w.e.tomcat.TomcatWebServer.start - Tomcat started on port(s): 8080 (http) with context path ''
03-12-2020 12:56:07.942 [main] INFO  o.a.c.spring.boot.RoutesCollector.loadXmlRoutes - Loading additional Camel XML routes from: classpath:camel/*.xml
03-12-2020 12:56:07.942 [main] INFO  o.a.c.spring.boot.RoutesCollector.loadXmlRests - Loading additional Camel XML rests from: classpath:camel-rest/*.xml
03-12-2020 12:56:07.951 [main] INFO  o.a.camel.spring.SpringCamelContext.start - Apache Camel 2.25.2 (CamelContext: camel-1) is starting
03-12-2020 12:56:07.952 [main] INFO  o.a.c.m.ManagedManagementStrategy.doStart - JMX is enabled
03-12-2020 12:56:08.104 [main] INFO  o.a.camel.spring.SpringCamelContext.doStartCamel - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
03-12-2020 12:56:08.135 [main] INFO  o.a.c.component.kafka.KafkaConsumer.doStart - Starting Kafka consumer on topic: testtopic9 with breakOnFirstError: false
03-12-2020 12:56:08.145 [main] INFO  o.a.camel.spring.SpringCamelContext.doStartOrResumeRouteConsumers - Route: route1 started and consuming from: kafka://testtopic9?brokers=<domain-name>%3A9092&groupId=test&keyDeserializer=org.apache.kafka.common.serialization.IntegerDeserializer&saslJaasConfig=xxxxxx&saslMechanism=PLAIN&securityProtocol=SASL_SSL&sslKeyPassword=xxxxxx&sslKeystorePassword=xxxxxx&sslTruststoreLocation=C%3A%5CUsers%5CSRJANA%5CDesktop%5CKafka%5Ckafka.client.truststore.jks&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer
03-12-2020 12:56:08.148 [main] INFO  o.a.camel.spring.SpringCamelContext.start - Total 1 routes, of which 1 are started
03-12-2020 12:56:08.149 [main] INFO  o.a.camel.spring.SpringCamelContext.start - Apache Camel 2.25.2 (CamelContext: camel-1) started in 0.198 seconds
03-12-2020 12:56:08.151 [main] INFO  c.c.cdc.CDCPostProcessorApplication.logStarted - Started CDCPostProcessorApplication in 3.409 seconds (JVM running for 4.561)
03-12-2020 12:56:08.160 [Camel (camel-1) thread #1 - KafkaConsumer[testtopic9]] INFO  o.a.k.c.consumer.ConsumerConfig.logAll - ConsumerConfig values: 
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [ <domain-name> ]
check.crcs = true
client.dns.lookup = default
client.id = 
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = e0a4fadb-5e12-49ab-87d5-3b124d3e1c76
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 50
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
</pre>

你确定这里有问题吗?如果到代理的SSL连接不起作用,我预计路由会失败。但是路由已启动

您的消费者设置中有auto.offset.reset = latest。这意味着使用者在第一次连接时会忽略所有现有消息

在消费者启动并连接后,您是否向主题发送了消息?

相关内容

最新更新