Kafka Connect 如何使命名空间与数据库名称无关



My Environment

MySQL(5.7(:我们有多个模式,命名约定是{application_name}_env。

示例:假设我们有两个应用:app1 和 app2

开发环境:数据库名称将app1_dev,app2_dev

QA 环境:数据库名称将app1_qa,app2_qa。

脱贝齐(0.8.3(.该插件用于CDC MySQL日志。

连接器配置为:

{
"name": "connector-1",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"decimal.handling.mode": "double",
"snapshot.mode": "when_needed",
"table.whitelist":"{database_name}.account",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms" : "setSchema",
"transforms.setSchema.type" : "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.setSchema.schema.name" : "com.test.Account"
}

}

Spring Java 应用程序我正在使用 Kafka Consumer(@KafkaListener( 来读取 Debezium 事件的变化。

我提供了 avsc 文件并使用 gradle avro 插件来生成类。

来自 Dev env 的架构

{
"type":"record",
"name":"Accounts",
"namespace":"com.test",
"fields":[
{
"name":"before",
"type":[
"null",
{
"type":"record",
"name":"Value",
"namespace":"dbserver1.app1_dev.account",
"fields":[
{
"name":"id",
"type":"long"
}
],
"connect.name":"dbserver1.app1_dev.account.Value"
}
],
"default":null
},
{
"name":"after",
"type":[
"null",
"dbserver1.app1_dev.account.Value"
],
"default":null
},
{
"name":"source",
"type":{
"type":"record",
"name":"Source",
"namespace":"io.debezium.connector.mysql",
"fields":[
{
"name":"version",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"name",
"type":"string"
},
{
"name":"server_id",
"type":"long"
},
{
"name":"ts_sec",
"type":"long"
},
{
"name":"gtid",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"file",
"type":"string"
},
{
"name":"pos",
"type":"long"
},
{
"name":"row",
"type":"int"
},
{
"name":"snapshot",
"type":[
{
"type":"boolean",
"connect.default":false
},
"null"
],
"default":false
},
{
"name":"thread",
"type":[
"null",
"long"
],
"default":null
},
{
"name":"db",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"table",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"query",
"type":[
"null",
"string"
],
"default":null
}
],
"connect.name":"io.debezium.connector.mysql.Source"
}
},
{
"name":"op",
"type":"string"
},
{
"name":"ts_ms",
"type":[
"null",
"long"
],
"default":null
}
],
"connect.name":"com.test.Account"
}

问题:由于我的数据库架构是动态的,即它们以 env 后缀结尾。

在每个环境中生成的架构具有不同的命名空间。

开发:dev.app1_dev.帐户 质量保证:dev.app1_qa.帐户

由于命名空间不同,我无法在 QA 中反序列化我的开发代码。因此,如果使用在 Dev 中生成的架构,则代码在 QA 中不起作用。

我想确保命名空间在所有环境中都是一致的。

请使用org.apache.kafka.connect.transforms.SetSchemaMetadataSMT - 请参阅 https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java

相关内容

  • 没有找到相关文章

最新更新