如何配置Debezium使用特定列作为Kafka消息密钥


默认情况下,Debezium使用表的主键作为消息键。例如,如果您有一个表格
create table users
(
id            bigint auto_increment primary key,
department_id bigint
);

带数据

+----+----------------+
| id | department_id  |
+----+----------------+
|  5 |              1 |
|  6 |              1 |
|  7 |              2 |
+----+----------------+

Debezium将生成以下Kafka消息:

Key: {"id": 5} Value: {"id": 5, "department_id": 1}
Key: {"id": 6} Value: {"id": 6, "department_id": 1}
Key: {"id": 7} Value: {"id": 7, "department_id": 2}

问题是如何将Debezium配置为使用department_id或任何其他列作为Kafka消息密钥?

message.key.columns参数。在连接器的配置中,您应该这样设置:

{
"name": "my-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.whitelist": "my_database",
...
"message.key.columns": "my_database.users:department_id"
}
}

所有关系型Debezium连接器都支持此参数。

您可以在这里找到更多信息:

https://debezium.io/blog/2019/09/26/debezium-0-10-0-cr2-released/https://debezium.io/documentation/reference/1.0/assemblies/cdc-mysql-connector/as_deploy-the-mysql-connector.html#mysql-连接器配置属性_debezium

最新更新