来自Kafka的MongoSource连接创建了奇怪的_data键



我正在使用具有以下配置的KafkaConnect - MongoSource:

curl -X PUT http://localhost:8083/connectors/mongo-source2/config -H "Content-Type: application/json" -d '{
"name":"mongo-source2",
"tasks.max":1,
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"connection.uri":"mongodb://xxx:xxx@localhost:27017/mydb",
"database":"mydb",
"collection":"claimmappingrules.66667777-8888-9999-0000-666677770000",
"pipeline":"[{"$addFields": {"something":"xxxx"} }]",
"transforms":"dropTopicPrefix",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":".*",
"transforms.dropTopicPrefix.replacement":"my-topic"
}'

出于某种原因,当我使用消息时,我得到了一个奇怪的密钥:

"_id": {
"_data": "825DFD2A53000000012B022C0100296E5A1004060C0FB7484A4990A7363EF5F662CF8D465A5F6964005A1003F9974744D06AFB498EF8D78370B0CD440004"
}

我不知道它来自哪里,我的 mongo 文档_id是 UUID,当我使用消息时,我希望在我的使用者密钥上看到 documentKey 字段。

下面是连接器发布到 kafka 的消息示例:

{
"_id": {
"_data": "825DFD2A53000000012B022C0100296E5A1004060C0FB7484A4990A7363EF5F662CF8D465A5F6964005A1003F9974744D06AFB498EF8D78370B0CD440004"
},
"operationType": "replace",
"clusterTime": {
"$timestamp": {
"t": 1576872531,
"i": 1
}
},
"fullDocument": {
"_id": {
"$binary": "+ZdHRNBq+0mO+NeDcLDNRA==",
"$type": "03"
},
...
},
"ns": {
"db": "security",
"coll": "users"
},
"documentKey": {
"_id": {
"$binary": "+ZdHRNBq+0mO+NeDcLDNRA==",
"$type": "03"
}
}
}

与 Kafka 连接配置的模式相关的文档非常有限。我知道现在回复为时已晚,但最近我也遇到了同样的问题,并通过反复试验找到了解决方案。

我将这两个配置添加到我的 mongodb-kafka-connect 配置中 -

"output.format.key": "schema",
"output.schema.key": "{"name":"sampleId","type":"record","namespace":"com.mongoexchange.avro","fields":[{"name":"documentKey._id","type":"string"}]}",

但即使在这之后,我也不知道resume_token更改流作为 kafka 分区分配的密钥是否在性能方面有任何意义,甚至对于resume_token由于长时间不活动而过期的情况也是如此。

附言 - 我的 mongodb 作为源的 kafka 连接配置的最终版本是这个 -

{
"tasks.max": 1,
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"connection.uri": "mongodb://example-mongodb-0:27017,example-mongodb-1:27017,example-mongodb-2:27017/?replicaSet=replicaSet",
"database": "exampleDB",
"collection": "exampleCollection",
"output.format.key": "schema",
"output.schema.key": "{"name":"ClassroomId","type":"record","namespace":"com.mongoexchange.avro","fields":[{"name":"documentKey._id","type":"string"}]}",
"change.stream.full.document": "updateLookup",
"copy.existing": "true",
"topic.prefix": "mongodb"
}

相关内容

  • 没有找到相关文章

最新更新