如何使用SMT在Kafka中选择CDC JSON的内部字段作为记录的密钥



我已经尝试为以下CDC JSON数据使用SMT配置ValueToKey和ExtractField$Key。但由于id字段是内部的,它给了我一个错误,因为字段无法识别。如何使其可访问内部字段?

"before": null,
"after": {
"id": 4,
"salary": 5000
},
"source": {
"version": "1.5.0.Final",
"connector": "mysql",
"name": "Try-",
"ts_ms": 1623834752000,
"snapshot": "false",
"db": "mysql_db",
"sequence": null,
"table": "EmpSalary",
"server_id": 1,
"gtid": null,
"file": "binlog.000004",
"pos": 374,
"row": 0,
"thread": null,
"query": null
},
"op": "c",
"ts_ms": 1623834752982,
"transaction": null
}

使用的配置:

transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id

key.converter.schemas.enable=false
value.converter.schemas.enable=false

在属性文件中进行这些转换和更改。我可以让它成为可能。

不幸的是,如果不使用不同的转换,访问嵌套字段是不可能的。

如果你想使用内置的,你需要提取后状态,然后才能访问其字段

transforms=extractAfterState,createKey,extractInt
# Add these
transforms.extractAfterState.type=io.debezium.transforms.ExtractNewRecordState
# since you cannot get the ID from null events
transforms.extractAfterState.drop.tombstones=true 
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id

相关内容

  • 没有找到相关文章

最新更新