使用debezium将数据从MongoDB传输到kafka时,如何进行类型转换



在Mongodb中,对象是base64。我正在使用Debezium将这些文档流式传输到Kafka。如何在kafka中将ObjectId写成UUID?

Mongo示例文档:

{
"_id" : BinData(3,"8D/JiwMtkEKSfrfKsxUe+g=="),
"Version" : 5,
"CreatedAt" : ISODate("2021-09-22T00:24:43.939+03:00"),
"UpdatedAt" : ISODate("2021-09-22T00:32:53.096+03:00"),
"AbidikId" : BinData(3,"CVebG2sIf0OtxnUNZIl39g=="),
"GubidikId" : BinData(3,"U06d2Rk4nUG7Fz3iASM9LQ=="),
"IsActive" : true,
"BrandList" : [ "Sony2", "SUNY2" ],
"CategoryIdList" : [ ]
}

Kafka示例消息:

{
"_id": "8D/JiwMtkEKSfrfKsxUe+g==",
"Version": 5,
"CreatedAt": 1632259483939,
"UpdatedAt": 1632259973096,
"AbidikId": "CVebG2sIf0OtxnUNZIl39g==",
"GubidikId": "U06d2Rk4nUG7Fz3iASM9LQ==",
"IsActive": true,
"BrandList": [
"Sony2",
"SUNY2"
],
"CategoryIdList": []
}

我期待卡夫卡的消息是什么?

"_id" : "8bc93ff0-2d03-4290-927e-b7cab3151efa",
"AbidikId": "1b9b5709-086b-437f-adc6-750d648977f6",
"GubidikId": "d99d4e53-3819-419d-bb17-3de201233d2d"

我在Debezium找不到解决方案。我无法在MongoDB中更改ID。提前谢谢。

我们通过自定义smt解决了这个问题。

  1. 首先,我们从这个repo创建了java项目:https://github.com/confluentinc/kafka-connect-insert-uuid

  2. 自定义此项目中的数据,如"转换"或"编辑"。此外,我们还可以从debezium配置中获得我们想要的字段。

  3. 通过Maven导出jar。

  4. 做那个罐子里的docker图像。

  5. 在debezium配置中设置您的图像和字段:

    "变换":"unwrap,Reroute,convertguid,insertKey";,"transforms.convertguid.type":"com.example.kafka.connect.smt.Base64ToCsuuid$Value";,

    "transforms.convertguid.cuuid.field.names":&quot_id,examplefield1,examplefield2";,

  6. 重新创建debezium连接器。

最后,我们可以将mongo-id转换为UUID,同时流式传输到kafka。我希望它能帮助到需要它的人。

最新更新