我已经尝试为以下CDC JSON数据使用SMT配置ValueToKey和ExtractField$Key。但由于id字段是内部的,它给了我一个错误,因为字段无法识别。如何使其可访问内部字段?
"before": null,
"after": {
"id": 4,
"salary": 5000
},
"source": {
"version": "1.5.0.Final",
"connector": "mysql",
"name": "Try-",
"ts_ms": 1623834752000,
"snapshot": "false",
"db": "mysql_db",
"sequence": null,
"table": "EmpSalary",
"server_id": 1,
"gtid": null,
"file": "binlog.000004",
"pos": 374,
"row": 0,
"thread": null,
"query": null
},
"op": "c",
"ts_ms": 1623834752982,
"transaction": null
}
使用的配置:
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id
key.converter.schemas.enable=false
value.converter.schemas.enable=false
在属性文件中进行这些转换和更改。我可以让它成为可能。
不幸的是,如果不使用不同的转换,访问嵌套字段是不可能的。
如果你想使用内置的,你需要提取后状态,然后才能访问其字段
transforms=extractAfterState,createKey,extractInt
# Add these
transforms.extractAfterState.type=io.debezium.transforms.ExtractNewRecordState
# since you cannot get the ID from null events
transforms.extractAfterState.drop.tombstones=true
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id