如何在kafka sink连接器中设置特定的表?



我使用的是confluent kafka连接器。我想将数据插入到接收器连接器中的特定表TB_TEST_KAFKA中。我已经创建了表。

auto.create = false .

我不知道接收器连接器的属性表名称键。我试图插入到特定的表,TB_TEST_KAFKA,

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
errors.log.include.messages=true
dialect.name=SqlServerDatabaseDialect
connection.password=####
tasks.max=1
topics=test-topic
auto.evolve=false
connection.user=kafkauser
auto.create=false
connection.url=jdbc:sqlserver://####:1433;databaseName=TEST
errors.log.enable=true
insert.mode=insert
db.name=TB_TEST_KAFKA

但错误

Caused by: org.apache.kafka.connect.errors.ConnectException: Table "TB_TEST_KAFKA" is missing and auto-creation is disabled
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:116)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:68)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:73)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more

有没有办法把它放到表中??

JDBC Sink连接器使用主题名作为它填充的表的命名基础。您可以使用RegExRouter自定义表的名称(将这些行添加到连接器属性中):

transforms=renameTopic
transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.renameTopic.regex=test-topic
transforms.renameTopic.replacement=TB_TEST_KAFKA

您可以实现TableNamingStrategy和覆盖resolveTableName()方法。

table.naming.strategy

io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy

指定TableNamingStrategy实现的全限定类名,连接器使用该实现从传入的事件主题名中解析表名。

默认行为是:

Replace the ${topic} placeholder in the table.name.format configuration property with the event’s topic.
Sanitize the table name by replacing dots (.) with underscores (_).

最新更新