KafkaJDBC接收器连接器-USE语句不支持在数据库之间切换



我正在使用Kafka JDBC接收连接器将数据接收到Azure SQL服务器。我用一个数据库测试了连接器,它运行良好,但当我添加更多数据库时,我开始看到以下错误:

不支持USE语句在数据库之间切换。使用新连接连接到其他数据库

配置:

tasks.max: 1
topics: topic_name
connection.url: jdbc:sqlserver://server:port;database=dbname;user=dbuser
connection.user: dbuser
connection.password: dbpass
transforms: unwrap
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones: false
auto.create: true
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: true
insert.mode: upsert
delete.enabled: true
pk.mode: record_key

堆栈:

2020-12-10 11:56:36,990 ERROR WorkerSinkTask{id=NAME-sqlserver-jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: USE statement is not supported to switch between databases. Use a new connection to connect to a different database.
(org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-NAME-sqlserver-jdbc-sink-0]
org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: USE statement is not supported to switch between databases. Use a new connection to connect to a different database.
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: USE statement is not supported to switch between databases. Use a new connection to connect to a different database.

我已经发现了这个问题,一开始我认为这个问题是由于数据库服务器中有多个数据库造成的,但结果发现主题名称中有prefix.dbo.table_name,而不仅仅是table_name。因此,连接器正在检测prefix.dbo作为另一个数据库。

解决方案是使用变换dropPrefix。

例如,要将主题hello.dbo.table1hello.dbo.table2table1table2的数据保存在数据库中,请使用以下配置:

tasks.max: 1
topics: hello.dbo.table1, hello.dbo.table2
connection.url: jdbc:sqlserver://server:port;database=dbname;user=dbuser
connection.user: dbuser
connection.password: dbpass
transforms: dropPrefix,unwrap
transforms.dropPrefix.type: org.apache.kafka.connect.transforms.RegexRouter
transforms.dropPrefix.regex: hello.dbo.(.*)
transforms.dropPrefix.replacement: $1
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones: false
auto.create: true
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: true
insert.mode: upsert
delete.enabled: true
pk.mode: record_key

如果它不能作为单个连接器工作,则需要为每个数据库创建一个连接器。

最新更新