我使用的是confluent kafka连接器。我想将数据插入到接收器连接器中的特定表TB_TEST_KAFKA中。我已经创建了表。
auto.create = false .
我不知道接收器连接器的属性表名称键。我试图插入到特定的表,TB_TEST_KAFKA,
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
errors.log.include.messages=true
dialect.name=SqlServerDatabaseDialect
connection.password=####
tasks.max=1
topics=test-topic
auto.evolve=false
connection.user=kafkauser
auto.create=false
connection.url=jdbc:sqlserver://####:1433;databaseName=TEST
errors.log.enable=true
insert.mode=insert
db.name=TB_TEST_KAFKA
但错误
Caused by: org.apache.kafka.connect.errors.ConnectException: Table "TB_TEST_KAFKA" is missing and auto-creation is disabled
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:116)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:68)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:73)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more
有没有办法把它放到表中??
JDBC Sink连接器使用主题名作为它填充的表的命名基础。您可以使用RegExRouter
自定义表的名称(将这些行添加到连接器属性中):
transforms=renameTopic
transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.renameTopic.regex=test-topic
transforms.renameTopic.replacement=TB_TEST_KAFKA
您可以实现TableNamingStrategy和覆盖resolveTableName()方法。
table.naming.strategy
io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy
指定TableNamingStrategy实现的全限定类名,连接器使用该实现从传入的事件主题名中解析表名。
默认行为是:
Replace the ${topic} placeholder in the table.name.format configuration property with the event’s topic.
Sanitize the table name by replacing dots (.) with underscores (_).