我正在使用mongo源代码来收听mongo更改流并将所有事件放入kafka,但是我正在努力寻找一种从事件中提取"Real"键的方法。我尝试了转换,但没有奏效,给了我错误:
Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [copying fields from value to key], found: java.lang.String
在 Mongo 源代码中,我找到了这一行
这基本上意味着它甚至没有一些键处理,相反,它寻找"_id"字段(这不是文档的ID,而是恢复令牌信息(
相反,我想将主题的键设置为"documentKey"。
下面是连接器获取的事件示例:
{
"_id": {
"_data": "DSAD45543FFWEHTEY004....."
},
"operationType": "replace",
"clusterTime": {
"$timestamp": {
"t": 1446707990,
"i": 1
}
},
"fullDocument": {
"_id": {
"$binary": "FxVFgHFRhrr/z+zUc/w==",
"$type": "03"
},
...
},
"ns": {
"db": "somedb",
"coll": "somecol"
},
"documentKey": {
"_id": {
"$binary": "FxVFgHFRhrr/z+zUc/w==",
"$type": "03"
}
}
}
我使用了以下配置:
"transforms":"createKey",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"documentKey"
我试过了:
org.apache.kafka.connect.json.JsonConverter
还有字符串转换器(虽然我认为这不能用字符串完成(
org.apache.kafka.connect.storage.StringConverter
有什么方法可以提取密钥吗? 请注意:架构已禁用。
这是因为 MongoDB Source Connector for Kafka 尚不支持它。它应支持从版本 1.3 开始的高级密钥选择。
https://jira.mongodb.org/browse/KAFKA-40
请注意:架构已禁用
在这种情况下,不能使用 ValueToKey 转换。但是,即使可以,该转换也不支持有效负载中的嵌套值,在您的情况下,该值类似于documentKey._id.$binary