我在AWS MSK上有两个Kafka集群(在相同的环境和区域中(。我在目标集群上设置了KafkaConnect集群,并设置了一个镜像制造商连接器来运行。连接器的提交很好,没有错误。
当我尝试检查连接器的状态时,它会显示RUNNING:{"name":"mirror-maker-test-connector","connector":{"state":"RUNNING","worker_id":"<ip>:<port>"},"tasks":[task_list],"type":"source"}
我看到以下异常:
[2022-01-12 19:46:33,772] DEBUG [Producer clientId=connector-producer-mirror-maker-test-connector-0] Connection with b-2.<broker_ip> disconnected (org.apache.kafka.common.network.Selector)
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:120)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:551)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-01-12 19:46:33,773] DEBUG [Producer clientId=connector-producer-mirror-maker-test-connector-0] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
[2022-01-12 19:46:33,773] WARN [Producer clientId=connector-producer-mirror-maker-test-connector-0] Bootstrap broker b-2.<broker_ip>:9094 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
我可以从Kafka connect k8s pod中使用netcat
连接到指定的broker。
以前有人遇到过这个问题吗?
我开始工作了——在提交Mirror Maker连接器时,必须为使用者和生产者添加SSL属性。
"target.cluster.security.protocol": "SSL",
"target.cluster.ssl.truststore.location":"<certs_path>",
"target.cluster.ssl.truststore.password": "<password>"
"source.cluster.security.protocol": "SSL",
"source.cluster.ssl.truststore.location": "<certs_path>",
"source.cluster.ssl.truststore.password": "<password>"