嗨,我正在使用debezium来捕获Mongo中的更改,并将它们推送到mysql中我正在使用以下链接https://github.com/debezium/debezium-examples/tree/master/unwrap-mongodb-smt我正在将endpostgresdb替换为mysql数据库,但我无法做到这一点。
这是我修改过的jdbc-sink.json,我使用mysqlurl进行连接。
{
"name" : "jdbc-sink",
"config" : {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max" : "1",
"topics" : "customers",
"connection.url" : "jdbc:mysql://localhost:3306/inventorydb?user=user&password=password",
"auto.create" : "true",
"auto.evolve" : "true",
"insert.mode" : "upsert",
"delete.enabled": "true",
"pk.fields" : "id",
"pk.mode": "record_key",
"transforms": "mongoflatten",
"transforms.mongoflatten.type" : "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.mongoflatten.drop.tombstones": "false"
}
}
但是我在运行时出现以下错误
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json
HTTP/1.1 500内部服务器错误日期:2019年11月6日星期三08:13:39 GMT内容类型:application/json内容长度:3404服务器:Jetty(9.4.18.v20190429(
{">error_code":500,"message":"找不到任何实现连接器且名称与io.confluent.connect.jdbc.JdbcSinkConnector匹配的类,可用的连接器有:PluginDesc{klass=class io.debezium.connecter.mongodb.MongoDbConnector,name='io.debezum.connecter.mongob.mongodb连接器',version='1.0.0-SNAPSHOT',encodedVersion=1.0.0-SNAPSSHOT,type=source,typeName='source',location='file:/kafka/connect/debezium connector mongodb/'},PluginDesc{klass=class io.debezium.connecter.mysql.MySqlConnector,name='io.debezum.connecter.mysql.MMySqlConnector',version='1.0.0-SNAPSHOT',encodedVersion=1.0.0-SNAPSSHOT,type=source,typeName='source',location='file:/kafka/connect/debezium connector mysql/'},PluginDesc{klass=class io.debezium.connecter.oracle.OracleConnector,name='io.debezum.connector.oracle.OracleConnector',version='1.0.0-SNAPSHOT',encodedVersion=1.0.0-SNAPSSHOT,type=source,typeName='source',location='file:/kafka/connect/debezium connector oracle/'},PluginDesc{klass=class io.debezium.connecter.postgresql.PostgresConnector,name='io.debezum.connecter.postgressql.PostgresConnector',version='1.0.0-SNAPSHOT',encodeVersion=1.0.0-SNAPSSHOT,type=source,typeName='source',location='file:/kafka/connect/debezium connector postgres/'},PluginDesc{klass=class io.debezium.connecter.sqlserver.SqlServerConnector,name='io.debezum.connecter.sqlserver.SqlServerConnector',version='1.0.0-SNAPSHOT',encodedVersion=1.0.0-SNAPSSHOT,type=source,typeName='source',location='file:/kafka/connect/debezium connector sqlserver/'},PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector,name='org.apache.cafka.connect.FileStreamSinkConnector',version='2.3.0',encodedVersion=2.3.0,type=sink,typeName='ink',location='classpath'},PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector,name='org.apache.cafka.connect.FileStreamSource连接器',version='2.3.0',encodedVersion=2.3.0,type=source,typeName='source',location='classpath'},PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector,name=‘org.apache.kaf ka.connect.tools.Mock connector’,version=‘2.3.0’,encodedVersion=2.3.0,type=connector,typeName=‘connector’,location=‘classpath’},PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector,name=‘org.apache.kaf ka.connect.tools.Mock SinkConnector’,version=‘2.3.0’,encodedVersion=2.3.0,type=sink,typeName=‘link’,location=‘classpath’},PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector,name='org.apache.cafka.connect.tools.MockSourceConnector',version='2.3.0',encodedVersion=2.3.0,type=source,typeName='source',location='classpath'},PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector,name='org.apache.cafka.connect.tools.SchemaSource连接器',version='2.3.0',encodedVersion=2.3.0,type=source,typeName='source',location='classpath'},PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector,name='org.apache.kafka.comconnect.tools.VerifiableSinkConnector',version='2.3.0',encodedVersion=2.3.0,type=source,typeName='source',location='classpath'},PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector,name='org.apache.cafka.connect.tools.VerifyableSourceConnector',version='2.3.0',encodedVersion=2.3.0,type=source,typeName='source',location='classpath'}"}
我知道有些人找不到io.confluent.connect.jdbc.JdbcSinkConnector,但我应该如何/以及我应该把这样的罐子放在哪里。
感谢
您还没有在Kafka Connect中提供接收器连接器,请参阅命令docker-compose up --build -d
用于启动,该命令使用JDBC接收器连接器构建新的Connect映像https://github.com/debezium/debezium-examples/blob/master/unwrap-mongodb-smt/debezium-jdbc/Dockerfile#L10
使用以下命令下载JAR并将其保存在插件目录中:
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.0.0/kafka-connect-jdbc-10.0.0.jar