在 jdbc-sink-connector 中使用 kafka SMT 将 avro 模式中的映射字段转换为字符串?



我有一个 avro 模式定义如下:

[    
{
"namespace": "com.fun.message",
"type": "record",
"name": "FileData",
"doc": "Avro Schema for FileData",
"fields": [
{"name": "id", "type": "string", "doc": "Unique file id" },
{"name": "absolutePath", "type": "string", "doc": "Absolute path of file" },
{"name": "fileName", "type": "string", "doc": "File name" },
{"name": "source", "type": "string", "doc": "unique identification of source" },
{"name": "metaData", "type": {"type": "map", "values": "string"}}
]
}
]

我想使用 jdbc-sink-connector 将此数据推送到 postgres,以便我可以将架构中的"metaData"字段(即映射类型(转换为字符串。我该怎么做?

您需要使用 SMT 和 AFAIK 目前没有完全满足您的要求的 SMT(ExtractField是一个Map.get操作,因此嵌套字段无法一次性提取(。你可以看看Debezium的io.debezium.transforms.UnwrapFromEnvelopeSMT,你可以修改它以提取嵌套字段。

UnwrapFromEnvelope用于CDC事件扁平化,以便从更复杂的结构中提取字段,例如Debezium形成的数据(我相信它与您的结构相似(。

最新更新