我需要将SQL Server数据库从预处理位置迁移到GoogleCloud使用合流/kafka进行
我确实有一个源debezium连接器
{
"name": "mssql_src",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
...
...
...
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "source_dbname.dbo(.*)",
"transforms.Reroute.topic.replacement": "target_dbname$1"
}
}
重新路由变换不与展开结合工作,我仍然得到source_dbname.dbo.*主题,而不是target_dbname.*
我需要将数据插入target_dbname数据库jdbc同步连接器具有以下配置
{
"name": "mssql_trg",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics.regex": "source_dbname.dbo.*",
"table.name.format": "${topic}",
"connection.url": "jdbc:sqlserver://xxx.xxx.xxx.xxx:1433;DatabaseName=rocketlawyer3",
"connection.user": "sqlserver",
"connection.password": "sqlserver",
"dialect.name": "SqlServerDatabaseDialect",
"insert.mode": "upsert",
"auto.create": true,
"auto.evolve": true,
"pk.mode": "record_value"
}
}
显然,它失败了,因为所有SQL操作都将表引用为source_database_name.dbo.table_name
这里有两个问题:
如何使用table.name.format仅为table_name更改此字符串source_database_name.dbo.table_name或其他选项
如何在源连接器中同时进行转换(重新路由和展开(
您需要使用链式转换。只需将unwrap
和Rerout
SMT组合在一起:
{
"name": "mssql_src",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
...
...
...
"transforms": "unwrap, Reroute",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "([^.]+)\.([^.]+)\.([^.]+)",
"transforms.Reroute.topic.replacement": "$3"
}
}