我正在尝试创建一个带有转换的Debezium MySQL连接器来提取密钥。
在关键转换之前:
create source connector mysql with(
"connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
"database.hostname" = 'mysql',
"tasks.max" = '1',
"database.port" = '3306',
"database.user" = 'debezium',
"database.password" = 'dbz',
"database.server.id" = '42',
"database.server.name" = 'before',
"table.whitelist" = 'deepprices.deepprices',
"database.history.kafka.bootstrap.servers" = 'kafka:29092',
"database.history.kafka.topic" = 'dbz.deepprices',
"include.schema.changes" = 'true',
"transforms" = 'unwrap',
"transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope');
主题结果是:
> rowtime: 2020/05/20 16:47:23.354 Z, key: [St@5778462697648631933/8247607644536792125], value: {"id": "P195910", "price": "1511.64"}
当 key.converter 设置为JSON时,Key 将变为{"id": "P195910"}
所以,我想从键中提取id并使其成为字符串键:
预期成果 :
rowtime: 2020/05/20 16:47:23.354 Z,
key: 'P195910',
value: {"id": "P195910", "price": "1511.64"}
在尝试使用ExtractField
或ValueToKey
的转换时,我得到:
DataException: Field does not exist: id
:
我尝试使用包含ValueToKey
的说明:
create source connector mysql with(
"connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
"database.hostname" = 'mysql',
"tasks.max" = '1',
"database.port" = '3306',
"database.user" = 'debezium',
"database.password" = 'dbz',
"database.server.id" = '42',
"database.server.name" = 'after',
"table.whitelist" = 'deepprices.deepprices',
"database.history.kafka.bootstrap.servers" = 'kafka:29092',
"database.history.kafka.topic" = 'dbz.deepprices',
"include.schema.changes" = 'true',
"key.converter" = 'org.apache.kafka.connect.json.JsonConverter',
"key.converter.schemas.enable" = 'TRUE',
"value.converter" = 'org.apache.kafka.connect.json.JsonConverter',
"value.converter.schemas.enable" = 'TRUE',
"transforms" = 'unwrap,createkey',
"transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope',
"transforms.createkey.type" = 'org.apache.kafka.connect.transforms.ValueToKey',
"transforms.createkey.fields" = 'id'
);
在我的Kafka 连接日志中导致以下错误:
Caused by: org.apache.kafka.connect.errors.DataException: Field does not exist: id
at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:89)
at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:67)
将转换类型从UnwrapFromEnvelope
更改为ExtractNewRecordState
,解决了 Debezium MySQL CDC 连接器版本1.1.0
上的问题。
transforms.unwrap.type" = 'io.debezium.transforms.ExtractNewRecordState'
由于您在此处使用 ksqlDB,因此您需要将源连接器设置为将密钥写入字符串:
key.converter=org.apache.kafka.connect.storage.StringConverter