我正在尝试在docker的帮助下使用kafka debezium(Kafka streaming(将表数据一个DB下沉到另一个数据库。 数据库流工作正常。但是流式数据下沉另一个MySQL数据库进程出现错误。
对于我的连接器接收器配置,如下所示。
{
"name": "mysql_sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "mysql-connect.kafka_test.employee",
"connection.url": "jdbc:mysql://localhost/kafka_test_1&user=debezium&password=xxxxx",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value",
"errors.tolerance": "all",
"errors.log.enable":"true",
"errors.log.include.messages":"true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"name": "mysql_sink"
}
}
但是我收到一个错误。
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost/kafka_test_1&user=debezium&password=xxxxx
io.confluent.connect.jdbc.util.CachedConnectionProvider.getValidConnection(CachedConnectionProvider.java:59)
io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:52)
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)nt... 10 morenCaused by: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost/kafka_test_1&user=debezium&password=xxxxx
java.sql.DriverManager.getConnection(DriverManager.java:689)
java.sql.DriverManager.getConnection(DriverManager.java:247)
io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:66)
io.confluent.connect.jdbc.util.CachedConnectionProvider.getValidConnection(CachedConnectionProvider.java:52)nt... 13 more
我正在使用码头工人。
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
command: [start-kafka.sh]
ports:
- "9092:9092"
links:
- zookeeper
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092,
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
connect:
build:
context: debezium-jdbc
ports:
- "8083:8083"
links:
- kafka
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
CLASSPATH: /kafka/connect/kafka-connect-jdbc-5.3.1.jar
我尝试了很多我不知道为什么会出现此错误的事情,还有一件事我不了解 java。
提前谢谢。
您收到此错误是因为 JDBCSink(和 JDBCSource(连接器使用 JDBC(顾名思义(连接到数据库,并且您尚未使适用于 MySQL 的 JDBC 驱动程序可用于连接器。
解决此问题的最佳方法是将MySQL JDBC驱动程序复制到与kafka-connect-jdbc
相同的文件夹中(在Docker映像上为/usr/share/java/kafka-connect-jdbc/
(。
如果您使用的是 Docker Compose,那么您有三个选择。
-
使用已安装的驱动程序构建自定义 Docker 映像
-
在本地下载驱动程序
# Download to host machine mkdir local-jdbc-drivers cd local-jdbc-drivers curl https://cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-8.0.18.tar.gz | tar xz
并将其挂载到容器中,进入 Kafka Connect JDBC 的路径:
volumes: - ${PWD}/local-jdbc-drivers:/usr/share/java/kafka-connect-jdbc/driver-jars/
-
在运行时安装它,如下所示:
command: - /bin/bash - -c - | # JDBC Drivers # ------------ # MySQL cd /usr/share/java/kafka-connect-jdbc/ curl https://cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-8.0.18.tar.gz | tar xz # Now launch Kafka Connect sleep infinity & /etc/confluent/docker/run
有关更多详细信息,请参阅此博客。
No suitable driver found
。我正在使用kakfa(不是融合平台(,发现您可能遇到两个问题:
- JDBC URL 格式不正确
- 为您的 Kafka 选择的驱动程序不是正确的驱动程序。
我使用了最新的驱动程序mysql-connector-java-8.0.21
并收到不合适的驱动程序错误。但是,当我切换到版本 mysql-connector-java-5.1.49
(2020 年发布(时,一切都像魅力一样工作。
您可以从 maven 存储库获取驱动程序版本: https://mvnrepository.com/artifact/mysql/mysql-connector-java
将驱动程序复制到类路径,就我而言,如果下载了 kafka 并复制到kafka_2.12-2.3.1/libs/
目录中
我的问题实际上有点好笑。我的插件路径中有必要的 jar 文件,到目前为止一切都很好。但是我在不同的文件夹中有 3 个相同的 jar 文件。所以我使用以下方法搜索它们:
find /kafka/ -name ojdbc*.jar
我删除了其中的 2 个。重新启动服务后,一切开始正常工作。可能性很小,但您可能遇到同样的问题:p