当ExtractField$Key转换添加到连接器配置时,逻辑删除将消失



我用ksqldb声明了以下连接器:

CREATE
SOURCE CONNECTOR `myconn` WITH (
"name" = 'myconn',
"connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
"tasks.max" = 1,
"database.hostname" = 'myconn-db',
"database.port" = '${dbPort}',
"database.user" = '${dbUsername}',
"database.password" = '${dbPassword}',
"database.history.kafka.topic" = 'myconn_db_history',
"database.history.kafka.bootstrap.servers" = '${bootstrapServer}',
"database.server.name" = 'myconn_db',
"database.allowPublicKeyRetrieval" = '${allowPublicKeyRetrieval}',
"table.include.list" = 'myconn.links,myconn.imports',
"message.key.columns" = 'myconn.links:id',
"tombstones.on.delete" = true,
"null.handling.mode" = 'keep',
"transforms" = 'unwrap',
"transforms.unwrap.type" = 'io.debezium.transforms.ExtractNewRecordState',
"transforms.unwrap.drop.tombstones" = false,
"transforms.unwrap.delete.handling.mode" = 'none'
);

逻辑删除已成功发送,但消息中的密钥是Struct(id=00000)。为了通过00000更改密钥,我使用了ExtractField$Key变换:

CREATE
SOURCE CONNECTOR `myconn` WITH (
"name" = 'myconn',
"connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
"tasks.max... 
--- I omit all the rest for convenience ---
"transforms" = 'unwrap,extractKey',
---New lines added (next 3)
"transforms.extractKey.type" = 'org.apache.kafka.connect.transforms.ExtractField$Key',
"transforms.extractKey.field" = 'id',
"include.schema.changes" = false
);

只添加最后三行,现在键可以了,但墓碑消失了;主题中没有墓碑。你知道原因吗

如您所见,白名单(table.include.list(中允许有多个表。第二个表有不同的id字段;不是"id",而是"import_id"。在内部,该字段似乎无法正确提取,并且(所有表的(tombstone被忽略。

我不确定这种行为的原因是什么(执行describe connector myconn时没有报告错误;类似于"找不到密钥id"的内容会很有用(,但我只是用正确的密钥处理每个主题就解决了这个问题。

这里有新的连接器定义:

CREATE
SOURCE CONNECTOR `myconn` WITH (
"name" = 'myconn',
"connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
"tasks.max" = 1,
--Database config--------------------------
"database.hostname" = 'myconn-db',
"database.port" = '${dbPort}',
"database.user" = '${dbUsername}',
"database.password" = '${dbPassword}',
"database.history.kafka.topic" = 'myconn_db_history',
"database.history.kafka.bootstrap.servers" = '${bootstrapServer}',
"database.server.name" = 'myconn_db',
"database.allowPublicKeyRetrieval" = '${allowPublicKeyRetrieval}',
"table.include.list" = 'myconn.links,myconn.imports',
--Connector behavior------------------------
"tombstones.on.delete" = true,
"null.handling.mode" = 'keep',
"include.schema.changes" = false,
--Predicates--------------------------------
"predicates" = 'TopicDoestHaveIdField,IsImportTopic',
"predicates.TopicDoestHaveIdField.type" = 'org.apache.kafka.connect.transforms.predicates.TopicNameMatches',
"predicates.TopicDoestHaveIdField.pattern" = 'myconn_db.myconn.(imports)',
"predicates.IsImportTopic.type" = 'org.apache.kafka.connect.transforms.predicates.TopicNameMatches',
"predicates.IsImportTopic.pattern" = 'myconn_db.myconn.imports',
--Transforms--------------------------------
"transforms" = 'unwrap,extractKey,extractImportKey',
"transforms.unwrap.type" = 'io.debezium.transforms.ExtractNewRecordState',
"transforms.unwrap.drop.tombstones" = false,
"transforms.unwrap.delete.handling.mode" = 'none',
"transforms.extractKey.type" = 'org.apache.kafka.connect.transforms.ExtractField$Key',
"transforms.extractKey.field" = 'id',
"transforms.extractKey.predicate" = 'TopicDoestHaveIdField',
"transforms.extractKey.negate" = true,
"transforms.extractImportKey.type" = 'org.apache.kafka.connect.transforms.ExtractField$Key',
"transforms.extractImportKey.field" = 'import_id',
"transforms.extractImportKey.predicate" = 'IsImportTopic'
);

现在我有了主题中的tombstone,并且从表中正确地删除了行。

相关内容

最新更新