我正在尝试设置Debezium MySQL源连接器。我的目标是每个数据库都有一个主题,所以我正在研究利用主题的可能性,这样一个主题可以包含不同的消息类型,它们的模式可以存储在Confluent schema Registry中。
根据这里的几个答案,我将键和值转换器主题名称策略设置为io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
。
要将来自同一模式的所有消息重新路由到相同的主题,我使用以下配置:
{
"name": "aws-db-connector",
"config": {
"group.id": "aws-db-group",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "secret-pw",
"database.server.id": "184054",
"database.server.name": "aws-db",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.aws-db",
"database.include.list": "db1,db2",
"transforms": "unwrap,Reroute",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "db,table,op,source.ts_ms",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "(.*\S)\.(.*\S)\.(.*\S)",
"transforms.Reroute.topic.replacement": "$2_schema",
"transforms.Reroute.key.field.name": "table",
"transforms.Reroute.key.field.regex": "(.*\S)\.(.*\S)\.(.*\S)",
"transforms.Reroute.key.field.replacement": "$3"
}
}
在我的docker-compose
文件中,我设置了:
- CONNECT_KEY_CONVERTER_KEY_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- CONNECT_VALUE_CONVERTER_VALUE_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081
- CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081
对于Values,这是完美的工作。我可以看到我的模式注册表包含多个格式为<TopicName>-<RecordName>-Value
的主题,其中TopicName
是我要将此数据重路由到的主题的名称。RecordName
是"old"Debezium创建的主题名称,格式为server_name.database_name.table_name
.
对于Keys,不幸的是,这个策略没有像预期的那样工作,我只有一个模式主题:看起来RecordName
包含新的主题名称而不是原来的主题名称。如果一个字段名在不同的表中有不同的类型,这将导致冲突和不兼容错误。
是否有办法提供适当的RecordName
时,关键主题产生?
编辑-添加示例:
让我们假设我的数据库包含三个表,table1
,table2
和table3
。
Table1:
CREATE TABLE `table1` (
`id` INT NOT NULL AUTO_INCREMENT,
`name` TEXT,
PRIMARY KEY (`id`)
);
表:
CREATE TABLE `table2` (
`id` INT NOT NULL AUTO_INCREMENT,
`name` BINARY,
PRIMARY KEY (`id`)
);
Table3:
CREATE TABLE `table3` (
`id` BINARY NOT NULL,
`name` INT,
PRIMARY KEY (`id`)
);
使用上述配置运行Debezium,它会在模式注册表中创建以下Value主题:
- db1_schema.db1-aws-db.table1-Value
- db1_schema.db1-aws-db.table2-Value
和以下关键主题:
- db1_schema.db1_schema-Key
当轮到表3时,Debezium连接器失败,因为id
列在模式注册中心主题中以int
类型注册,并且与表3中的bytes
类型不兼容。因此我得到这个错误:
正在注册的模式与先前的模式不兼容;错误码:409
我所期望的是为键创建单独的主题:
- db1_schema.aws-db-db1.table1-Key
- db1_schema.aws-db-db1.table2-Key
- db1_schema.aws-db-db1.table3-Key
以这样一种方式,具有不同键模式的消息可以存储在同一个主题中。
这似乎是Debezium的默认工作方式。它将为每个主题只创建一个键模式,但会创建不同的值模式,因此路由到该主题的所有消息应该共享相同的键结构。
为了解决这个问题,应该使用RegexRouter
。在重路由之前应用InsertField
转换,还可以将原始主题名称添加到键中,并且可以从中提取表名称。
"transforms": "InsertField,Reroute",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Key",
"transforms.InsertField.topic.field": "table"
"transforms.Reroute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.Reroute.regex": "(.*\S)\.(.*\S)\.(.*\S)",
"transforms.Reroute.replacement": "$2_schema",