在Mongodb中,对象是base64。我正在使用Debezium将这些文档流式传输到Kafka。如何在kafka中将ObjectId写成UUID?
Mongo示例文档:
{
"_id" : BinData(3,"8D/JiwMtkEKSfrfKsxUe+g=="),
"Version" : 5,
"CreatedAt" : ISODate("2021-09-22T00:24:43.939+03:00"),
"UpdatedAt" : ISODate("2021-09-22T00:32:53.096+03:00"),
"AbidikId" : BinData(3,"CVebG2sIf0OtxnUNZIl39g=="),
"GubidikId" : BinData(3,"U06d2Rk4nUG7Fz3iASM9LQ=="),
"IsActive" : true,
"BrandList" : [ "Sony2", "SUNY2" ],
"CategoryIdList" : [ ]
}
Kafka示例消息:
{
"_id": "8D/JiwMtkEKSfrfKsxUe+g==",
"Version": 5,
"CreatedAt": 1632259483939,
"UpdatedAt": 1632259973096,
"AbidikId": "CVebG2sIf0OtxnUNZIl39g==",
"GubidikId": "U06d2Rk4nUG7Fz3iASM9LQ==",
"IsActive": true,
"BrandList": [
"Sony2",
"SUNY2"
],
"CategoryIdList": []
}
我期待卡夫卡的消息是什么?
"_id" : "8bc93ff0-2d03-4290-927e-b7cab3151efa",
"AbidikId": "1b9b5709-086b-437f-adc6-750d648977f6",
"GubidikId": "d99d4e53-3819-419d-bb17-3de201233d2d"
我在Debezium找不到解决方案。我无法在MongoDB中更改ID。提前谢谢。
我们通过自定义smt解决了这个问题。
-
首先,我们从这个repo创建了java项目:https://github.com/confluentinc/kafka-connect-insert-uuid
-
自定义此项目中的数据,如"转换"或"编辑"。此外,我们还可以从debezium配置中获得我们想要的字段。
-
通过Maven导出jar。
-
做那个罐子里的docker图像。
-
在debezium配置中设置您的图像和字段:
"变换":"unwrap,Reroute,convertguid,insertKey";,"transforms.convertguid.type":"com.example.kafka.connect.smt.Base64ToCsuuid$Value";,
"transforms.convertguid.cuuid.field.names":"_id,examplefield1,examplefield2";,
-
重新创建debezium连接器。
最后,我们可以将mongo-id转换为UUID,同时流式传输到kafka。我希望它能帮助到需要它的人。