Key Converter的Kafka Connect和主题名称策略



我正在尝试设置Debezium MySQL源连接器。我的目标是每个数据库都有一个主题,所以我正在研究利用主题的可能性,这样一个主题可以包含不同的消息类型,它们的模式可以存储在Confluent schema Registry中。

根据这里的几个答案,我将键和值转换器主题名称策略设置为io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

要将来自同一模式的所有消息重新路由到相同的主题,我使用以下配置:

{
"name": "aws-db-connector",
"config": {
"group.id": "aws-db-group",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "secret-pw",
"database.server.id": "184054",
"database.server.name": "aws-db",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.aws-db",
"database.include.list": "db1,db2",
"transforms": "unwrap,Reroute",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "db,table,op,source.ts_ms",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "(.*\S)\.(.*\S)\.(.*\S)",
"transforms.Reroute.topic.replacement": "$2_schema",
"transforms.Reroute.key.field.name": "table",
"transforms.Reroute.key.field.regex": "(.*\S)\.(.*\S)\.(.*\S)",
"transforms.Reroute.key.field.replacement": "$3"
}
}

在我的docker-compose文件中,我设置了:

- CONNECT_KEY_CONVERTER_KEY_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- CONNECT_VALUE_CONVERTER_VALUE_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081
- CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081

对于Values,这是完美的工作。我可以看到我的模式注册表包含多个格式为<TopicName>-<RecordName>-Value的主题,其中TopicName是我要将此数据重路由到的主题的名称。RecordName是"old"Debezium创建的主题名称,格式为server_name.database_name.table_name.

对于Keys,不幸的是,这个策略没有像预期的那样工作,我只有一个模式主题:看起来RecordName包含新的主题名称而不是原来的主题名称。如果一个字段名在不同的表中有不同的类型,这将导致冲突和不兼容错误。

是否有办法提供适当的RecordName时,关键主题产生?

编辑-添加示例:

让我们假设我的数据库包含三个表,table1,table2table3

Table1:

CREATE TABLE `table1` (
`id` INT NOT NULL AUTO_INCREMENT,
`name` TEXT,
PRIMARY KEY (`id`)
);

表:

CREATE TABLE `table2` (
`id` INT NOT NULL AUTO_INCREMENT,
`name` BINARY,
PRIMARY KEY (`id`)
);

Table3:

CREATE TABLE `table3` (
`id` BINARY NOT NULL,
`name` INT,
PRIMARY KEY (`id`)
);

使用上述配置运行Debezium,它会在模式注册表中创建以下Value主题:

  • db1_schema.db1-aws-db.table1-Value
  • db1_schema.db1-aws-db.table2-Value

和以下关键主题:

  • db1_schema.db1_schema-Key

当轮到表3时,Debezium连接器失败,因为id列在模式注册中心主题中以int类型注册,并且与表3中的bytes类型不兼容。因此我得到这个错误:

正在注册的模式与先前的模式不兼容;错误码:409

我所期望的是为键创建单独的主题:

  • db1_schema.aws-db-db1.table1-Key
  • db1_schema.aws-db-db1.table2-Key
  • db1_schema.aws-db-db1.table3-Key

以这样一种方式,具有不同键模式的消息可以存储在同一个主题中。

这似乎是Debezium的默认工作方式。它将为每个主题只创建一个键模式,但会创建不同的值模式,因此路由到该主题的所有消息应该共享相同的键结构。

为了解决这个问题,应该使用RegexRouter。在重路由之前应用InsertField转换,还可以将原始主题名称添加到键中,并且可以从中提取表名称。

"transforms": "InsertField,Reroute",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Key",
"transforms.InsertField.topic.field": "table"
"transforms.Reroute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.Reroute.regex": "(.*\S)\.(.*\S)\.(.*\S)",
"transforms.Reroute.replacement": "$2_schema",

最新更新