Kafka-找不到有效的证书路径



我使用的是apachekafka的docker环境。

我将整个环境配置为使用SSL,到目前为止,非常好。。。当我运行docker compose时,一切都会正常进行,没有错误。

问题是当我向连接容器发送POST以创建与MySQL的连接并在Kafka主题中复制时。

我收到了一条关于SSL的错误消息,但我不明白什么可能是错误,因为只有当我为MySQL 创建这个生产者配置时才会发生这种情况

docker ps-

CONTAINER ID   IMAGE                                              COMMAND                  CREATED         STATUS                            PORTS                                        NAMES
f4585e3fc2d4   chethanuk/kafka-connect:5.3.1                      "/etc/confluent/dock…"   6 minutes ago   Up 6 minutes (health: starting)   0.0.0.0:8083->8083/tcp, 9092/tcp             connect
4288ea7d0a3c   confluentinc/cp-schema-registry                    "/etc/confluent/dock…"   6 minutes ago   Up 6 minutes                      8081/tcp, 0.0.0.0:8181->8181/tcp             schema-registry
7d8571d525ff   confluentinc/cp-kafka:latest                       "/etc/confluent/dock…"   6 minutes ago   Up 6 minutes                      0.0.0.0:9092->9092/tcp                       broker
fb77012c0b7d   confluentinc/cp-zookeeper:latest                   "/etc/confluent/dock…"   6 minutes ago   Up 6 minutes                      2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp   zookeeper
08ae4c80b339   mysql:8                                            "docker-entrypoint.s…"   2 days ago      Up 30 hours                                                                    mysql

.env

SSL_SECRET=datahub
ZK_HOST=zookeeper
ZK_PORT=2181
BROKER_HOST=broker
BROKER_PORT=9092
SR_HOST=schema-registry
SR_PORT=8181
CON=connect
CON_PORT=8083
HOST=localhost

docker-compose.yml

---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: ${ZK_HOST}
hostname: ${ZK_HOST}
ports:
- "${ZK_PORT}:${ZK_PORT}"
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: ${ZK_PORT}
broker:
image: confluentinc/cp-kafka:latest
container_name: ${BROKER_HOST}
hostname: ${BROKER_HOST}
ports:
- "${BROKER_PORT}:${BROKER_PORT}"
depends_on:
- ${ZK_HOST}
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: '${ZK_HOST}:${ZK_PORT}'
KAFKA_ADVERTISED_LISTENERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
KAFKA_SSL_KEYSTORE_FILENAME: broker.keystore.jks
KAFKA_SSL_KEYSTORE_CREDENTIALS: cert_creds
KAFKA_SSL_KEY_CREDENTIALS: cert_creds
KAFKA_SSL_TRUSTSTORE_FILENAME: broker.truststore.jks
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: cert_creds
KAFKA_SSL_CLIENT_AUTH: 'none'
KAFKA_SECURITY_PROTOCOL: SSL
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./secrets:/etc/kafka/secrets
schema-registry:
image: confluentinc/cp-schema-registry
container_name: ${SR_HOST}
hostname: ${SR_HOST}
depends_on:
- ${ZK_HOST}
- ${BROKER_HOST}
ports:
- "${SR_PORT}:${SR_PORT}"
environment:
SCHEMA_REGISTRY_HOST_NAME: ${SR_HOST}
SCHEMA_REGISTRY_LISTENERS: 'https://0.0.0.0:${SR_PORT}'
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: '${ZK_HOST}:${ZK_PORT}'
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SSL
SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/schema-registry.keystore.jks
SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/schema-registry.keystore.jks
SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
SCHEMA_REGISTRY_KAFKASTORE_SSL_KEY_PASSWORD: ${SSL_SECRET}
SCHEMA_REGISTRY_SSL_KEY_PASSWORD: ${SSL_SECRET}
SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/schema-registry.truststore.jks
SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/schema-registry.truststore.jks
SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: https
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas2
SCHEMA_REGISTRY_SSL_CLIENT_AUTH: 'true'
volumes:
- ./secrets:/etc/schema-registry/secrets
connect:
user: '0'
image: chethanuk/kafka-connect:5.3.1
hostname: '${CON}'
container_name: ${CON}
depends_on:
- ${ZK_HOST}
- ${BROKER_HOST}
- ${SR_HOST}
ports:
- "${CON_PORT}:${CON_PORT}"
environment:
CONNECT_LISTENERS: 'https://0.0.0.0:${CON_PORT}'
CONNECT_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS'
CONNECT_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
CONNECT_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
CONNECT_REST_ADVERTISED_HOST_NAME: ${CON}
CONNECT_REST_PORT: ${CON_PORT}
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: https://${SR_HOST}:${SR_PORT}
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_ZOOKEEPER_CONNECT: '${ZK_HOST}:${ZK_PORT}'
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.1.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
CONNECT_SSL_CLIENT_AUTH: 'true'
CONNECT_SECURITY_PROTOCOL: SSL
CONNECT_SSL_KEY_PASSWORD: ${SSL_SECRET}
CONNECT_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/connect.truststore.jks
CONNECT_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
CONNECT_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/connect.keystore.jks
CONNECT_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
CONNECT_PRODUCER_SECURITY_PROTOCOL: SSL
CONNECT_PRODUCER_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/producer.truststore.jks
CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
CONNECT_CONSUMER_SECURITY_PROTOCOL: SSL
CONNECT_CONSUMER_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/consumer.truststore.jks
CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
volumes:
- ./secrets:/etc/kafka/secrets

create-cert.sh

#!/bin/bash
set -o nounset 
-o errexit
printf "Deleting previous (if any)..."
rm -rf secrets
mkdir secrets
mkdir -p tmp
echo " OK!"
# Generate CA key
printf "Creating CA..."
openssl req -new -x509 -keyout tmp/datahub-ca.key -out tmp/datahub-ca.crt -days 365 -subj '/CN=ca.datahub/OU=test/O=datahub/L=paris/C=fr' -passin pass:datahub -passout pass:datahub >/dev/null 2>&1
echo " OK!"
for i in 'broker' 'producer' 'consumer' 'schema-registry' 'connect'
do
printf "Creating cert and keystore of $i..."
# Create keystores
keytool -genkey -noprompt 
-alias $i 
-dname "CN=$i, OU=test, O=datahub, L=paris, C=fr" 
-keystore secrets/$i.keystore.jks 
-keyalg RSA 
-storepass datahub 
-keypass datahub  >/dev/null 2>&1
# Create CSR, sign the key and import back into keystore
keytool -keystore secrets/$i.keystore.jks -alias $i -certreq -file tmp/$i.csr -storepass datahub -keypass datahub >/dev/null 2>&1
openssl x509 -req -CA tmp/datahub-ca.crt -CAkey tmp/datahub-ca.key -in tmp/$i.csr -out tmp/$i-ca-signed.crt -days 365 -CAcreateserial -passin pass:datahub  >/dev/null 2>&1
keytool -keystore secrets/$i.keystore.jks -alias CARoot -import -noprompt -file tmp/datahub-ca.crt -storepass datahub -keypass datahub >/dev/null 2>&1
keytool -keystore secrets/$i.keystore.jks -alias $i -import -file tmp/$i-ca-signed.crt -storepass datahub -keypass datahub >/dev/null 2>&1
# Create truststore and import the CA cert.
keytool -keystore secrets/$i.truststore.jks -alias CARoot -import -noprompt -file tmp/datahub-ca.crt -storepass datahub -keypass datahub >/dev/null 2>&1
echo " OK!"
done
echo "datahub" > secrets/cert_creds
rm -rf tmp
echo "SUCCEEDED"

邮递员:https://localhost:8083/connectors

{
"name":"MySQL-Demo,
"config":{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"database.hostname":"<ip>",
"database.port":"3306",
"database.user":"root",
"database.password":"mysql",
"database.server.id":"1",
"database.server.name":"mysql",
"database.history.kafka.bootstrap.servers":"broker:9092",
"database.history.kafka.topic":"dbhistory.demo",
"database.history.producer.security.protocol":"SSL",
"database.history.producer.ssl.keystore.location":"/etc/kafka/secrets/connect.keystore.jks",
"database.history.producer.ssl.keystore.password":"datahub",
"database.history.producer.ssl.truststore.location":"/etc/kafka/secrets/connect.truststore.jks",
"database.history.producer.ssl.truststore.password":"datahub",
"database.history.producer.ssl.key.password":"datahub",
"database.history.consumer.security.protocol":"SSL",
"database.history.consumer.ssl.keystore.location":"/etc/kafka/secrets/connect.keystore.jks",
"database.history.consumer.ssl.keystore.password":"datahub",
"database.history.consumer.ssl.truststore.location":"/etc/kafka/secrets/connect.truststore.jks",
"database.history.consumer.ssl.truststore.password":"datahub",
"database.history.consumer.ssl.key.password":"datahub",
"include.schema.changes":"true",
"table.whitelist":"demo.test"
}
}

日志连接

[2021-05-22 18:58:27,342] INFO [Producer clientId=connector-producer-MySQL-Demo-0] Failed authentication with broker/192.168.16.3 (SSL handshake failed) (org.apache.kafka.common.network.Selector)
[2021-05-22 18:58:27,343] ERROR [Producer clientId=connector-producer-MySQL-Demo-0] Connection to node -1 (broker/192.168.16.3:9092) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-05-22 18:58:27,674] INFO Reading structure of database 'demo' (io.debezium.connector.mysql.MySqlSnapshotChangeEventSource)
[2021-05-22 18:58:28,374] INFO [Producer clientId=connector-producer-MySQL-Demo-0] Failed authentication with broker/192.168.16.3 (SSL handshake failed) (org.apache.kafka.common.network.Selector)
[2021-05-22 18:58:28,375] ERROR [Producer clientId=connector-producer-MySQL-Demo-0] Connection to node -1 (broker/192.168.16.3:9092) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-05-22 18:58:28,851] INFO Snapshot step 6 - Persisting schema history (io.debezium.relational.RelationalSnapshotChangeEventSource)
[2021-05-22 18:58:29,605] INFO [Producer clientId=connector-producer-MySQL-Demo-0] Failed authentication with broker/192.168.16.3 (SSL handshake failed) (org.apache.kafka.common.network.Selector)
[2021-05-22 18:58:29,606] ERROR [Producer clientId=connector-producer-MySQL-Demo-0] Connection to node -1 (broker/192.168.16.3:9092) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-05-22 18:58:29,788] ERROR Failed to send HTTP request to endpoint: https://schema-registry:8181/subjects/mysql-value/versions (io.confluent.kafka.schemaregistry.client.rest.RestService)
javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at sun.security.ssl.Alert.createSSLException(Alert.java:131)
at sun.security.ssl.TransportContext.fatal(TransportContext.java:324)
at sun.security.ssl.TransportContext.fatal(TransportContext.java:267)
at sun.security.ssl.TransportContext.fatal(TransportContext.java:262)
at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:645)
at sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:464)
at sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:360)
at sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:377)
at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:444)
at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:422)
at sun.security.ssl.TransportContext.dispatch(TransportContext.java:182)
at sun.security.ssl.SSLTransport.decode(SSLTransport.java:156)
at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1197)
at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1106)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:398)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:370)
at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1340)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1315)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:264)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:241)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:322)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:422)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:414)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:400)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:140)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:196)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:172)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:71)
at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:131)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456)
at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323)
at sun.security.validator.Validator.validate(Validator.java:271)
at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:315)
at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:223)
at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:129)
at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:629)
... 42 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:451)
... 48 more
[2021-05-22 18:58:29,799] INFO WorkerSourceTask{id=MySQL-Demo-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-05-22 18:58:29,799] INFO WorkerSourceTask{id=MySQL-Demo-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-05-22 18:58:29,800] ERROR WorkerSourceTask{id=MySQL-Demo-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic mysql :
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:83)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at sun.security.ssl.Alert.createSSLException(Alert.java:131)
at sun.security.ssl.TransportContext.fatal(TransportContext.java:324)
at sun.security.ssl.TransportContext.fatal(TransportContext.java:267)
at sun.security.ssl.TransportContext.fatal(TransportContext.java:262)
at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:645)
at sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:464)
at sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:360)
at sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:377)
at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:444)
at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:422)
at sun.security.ssl.TransportContext.dispatch(TransportContext.java:182)
at sun.security.ssl.SSLTransport.decode(SSLTransport.java:156)
at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1197)
at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1106)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:398)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:370)
at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1340)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1315)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:264)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:241)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:322)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:422)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:414)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:400)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:140)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:196)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:172)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:71)
at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:131)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456)
at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323)
at sun.security.validator.Validator.validate(Validator.java:271)
at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:315)
at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:223)
at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:129)
at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:629)
... 42 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:451)
... 48 more
[2021-05-22 18:58:29,805] ERROR WorkerSourceTask{id=MySQL-Demo-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
AvroConverter需要更多的配置才能使用https

请注意,在安全环境中使用Avro时,需要添加*.converter.schema.registry.ssl.属性

例如

key.converter.schema.registry.ssl.truststore.location=<location>
key.converter.schema.registry.ssl.truststore.password=<trustore-password>
key.converter.schema.registry.ssl.keystore.location=<keystore-location>
key.converter.schema.registry.ssl.keystore.password=<keystore-password>
key.converter.schema.registry.ssl.key.password=<key-password>
value.converter.schema.registry.ssl.truststore.location=<location>
value.converter.schema.registry.ssl.truststore.password=<trustore-password>
value.converter.schema.registry.ssl.keystore.location=<keystore-location>
value.converter.schema.registry.ssl.keystore.password=<keystore-password>
value.converter.schema.registry.ssl.key.password=<key-password> 

https://docs.confluent.io/platform/current/schema-registry/connect.html#avro

您可能还需要考虑添加CONNECT_ADMIN_变量来设置AdminClient SSL属性

在我们使用kafka版本3.2.1和融合注册表7.2.1的情况下,生产者和消费者都是基于java的,并且通过添加/设置属性解决了错误,该错误从生产者和消费者内属性开始:

configProperties.put(SchemaRegistryClientConfig.CLIENT_NAMEMESPACE+SslConfiguration.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION(;configProperties.put(SchemaRegistryClientConfig.CLIENT_NAMEMESPACE+SslConfiguration.SSL_TRUSTSTORE_PASSWORD_CONFIG,SSL_TRUSTSTORE_PASSWORD(;

此类已拥有命名空间SchemaRegistryClientConfig.CLIENT_NAMEMESPACE=schema.registry.

这解决了问题。

相关内容