如何告诉MongoSource(使用Kafka Connect)要序列化的密钥



我正在使用mongo源代码来收听mongo更改流并将所有事件放入kafka,但是我正在努力寻找一种从事件中提取"Real"键的方法。我尝试了转换,但没有奏效,给了我错误:

Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [copying fields from value to key], found: java.lang.String

在 Mongo 源代码中,我找到了这一行

这基本上意味着它甚至没有一些键处理,相反,它寻找"_id"字段(这不是文档的ID,而是恢复令牌信息(

相反,我想将主题的键设置为"documentKey"。

下面是连接器获取的事件示例:

{
"_id": {
"_data": "DSAD45543FFWEHTEY004....."
},
"operationType": "replace",
"clusterTime": {
"$timestamp": {
"t": 1446707990,
"i": 1
}
},
"fullDocument": {
"_id": {
"$binary": "FxVFgHFRhrr/z+zUc/w==",
"$type": "03"
},
...
},
"ns": {
"db": "somedb",
"coll": "somecol"
},
"documentKey": {
"_id": {
"$binary": "FxVFgHFRhrr/z+zUc/w==",
"$type": "03"
}
}
}

我使用了以下配置:

"transforms":"createKey",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"documentKey"

我试过了:

org.apache.kafka.connect.json.JsonConverter

还有字符串转换器(虽然我认为这不能用字符串完成(

org.apache.kafka.connect.storage.StringConverter

有什么方法可以提取密钥吗? 请注意:架构已禁用。

这是因为 MongoDB Source Connector for Kafka 尚不支持它。它应支持从版本 1.3 开始的高级密钥选择。

https://jira.mongodb.org/browse/KAFKA-40

请注意:架构已禁用

在这种情况下,不能使用 ValueToKey 转换。但是,即使可以,该转换也不支持有效负载中的嵌套值,在您的情况下,该值类似于documentKey._id.$binary

相关内容

  • 没有找到相关文章

最新更新