我正在使用Debezium和kafka-connect。我需要提取";listid";并能够将其分配给kafka消息密钥。使用连接器文件中的配置,我无法提取该值。感谢为解决此问题提供的任何帮助。
{
"before": null,
"after": {
"listid": 19,
"billingid": "0",
"userid": "test",
连接器属性
key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
transforms=unwrap,extract
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.extractkey.type==org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractkey.field=listid
key.converter.schemas.enable=false
value.converter.schemas.enable=true
Soooo我正在继续实验,并想分享对这个问题的更好解释以及我所做的实验。
跟随Robin的博客https://www.confluent.fr/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/
1-从终端运行kafka-avro消费者
密钥
{"LIST_ID":10058}
值
{"before":null,"after":{"test.dbo.test.Value":{"LIST_ID":10058,"billingid
etc...
2-我正试图让密钥成为值10058
3-这是我的转换配置
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
transforms=createKey,extractInt,
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=LIST_ID
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=LIST_ID
我在针对mssql服务器的管道中使用debezium连接器。
看起来像是打字错误(extract
与extractkey
(:
transforms=unwrap,extractkey
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.extractkey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractkey.field=listid