将debeuzium事件模式解码为golang中有意义的数据结构



Debezium模式是一个字符串(json(:

{
"type":"record",
"name":"Envelope",
"namespace":"datapipe.inventory.customers",
"fields":[
{
"name":"before",
"type":[
"null",
{
"type":"record",
"name":"Value",
"fields":[
{
"name":"id",
"type":"int"
},
{
"name":"first_name",
"type":"string"
},
{
"name":"last_name",
"type":"string"
},
{
"name":"email",
"type":"string"
}
],
"connect.name":"datapipe.inventory.customers.Value"
}
],
"default":null
},
{
"name":"after",
"type":[
"null",
"Value"
],
"default":null
},
{
"name":"source",
"type":{
"type":"record",
"name":"Source",
"namespace":"io.debezium.connector.mysql",
"fields":[
{
"name":"version",
"type":"string"
},
{
"name":"connector",
"type":"string"
},
{
"name":"name",
"type":"string"
},
{
"name":"ts_ms",
"type":"long"
},
{
"name":"snapshot",
"type":[
{
"type":"string",
"connect.version":1,
"connect.parameters":{
"allowed":"true,last,false"
},
"connect.default":"false",
"connect.name":"io.debezium.data.Enum"
},
"null"
],
"default":"false"
},
{
"name":"db",
"type":"string"
},
{
"name":"table",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"server_id",
"type":"long"
},
{
"name":"gtid",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"file",
"type":"string"
},
{
"name":"pos",
"type":"long"
},
{
"name":"row",
"type":"int"
},
{
"name":"thread",
"type":[
"null",
"long"
],
"default":null
},
{
"name":"query",
"type":[
"null",
"string"
],
"default":null
}
],
"connect.name":"io.debezium.connector.mysql.Source"
}
},
{
"name":"op",
"type":"string"
},
{
"name":"ts_ms",
"type":[
"null",
"long"
],
"default":null
},
{
"name":"transaction",
"type":[
"null",
{
"type":"record",
"name":"ConnectDefault",
"namespace":"io.confluent.connect.avro",
"fields":[
{
"name":"id",
"type":"string"
},
{
"name":"total_order",
"type":"long"
},
{
"name":"data_collection_order",
"type":"long"
}
]
}
],
"default":null
}
],
"connect.name":"datapipe.inventory.customers.Envelope"
}

我想把这个模式解码成一个有意义的数据结构,这样我就可以对这些数据执行操作。

使用map和for循环进行转换很慢,并且需要大量的操作,如下所示。

for k1, v2 := range m["fields"].([]interface{}) {
fmt.Println(k1)
v3 := v2.(map[string]interface{})
for _, value := range v3 {
if value == "before" {
vtype := v3["type"].([]interface{})
for kk, vtyp := range vtype {
fmt.Printf("kk %v, vtyp %vn", kk, vtyp)
// for k4, v4 := range vtyp.([]interface{}) {
//  fmt.Printf("key4: %v, value4: %vn", k4, v4)
// }
}

}
}
}

Golang有Debezium类型的吗?请提出建议。我确实检查了文档,但制作有这么多嵌套的类型非常令人困惑。

相关https://github.com/riferrei/srclient/issues/13

package main
import (
"fmt"
"encoding/json"
)
type DebeziumSchema struct {
Type        string         `json:"type"`
Name        string         `json:"name"`
Namespace   string         `json:"namespace"`
Fields      []SchemaField  `json:"fields"`
ConnectName string         `json:"connect.name"`
}
type SchemaField struct {
Name    string          `json:"name"`
Type    interface{}     `json:"type"`
Default interface{}     `json:"default,omitempty"`
}
func main() {
schema := `{"type":"record","name":"Envelope","namespace":"datapipe.inventory.customers","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":"int"},{"name":"first_name","type":"string"},{"name":"last_name","type":"string"},{"name":"email","type":"string"}],"connect.name":"datapipe.inventory.customers.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mysql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"table","type":["null","string"],"default":null},{"name":"server_id","type":"long"},{"name":"gtid","type":["null","string"],"default":null},{"name":"file","type":"string"},{"name":"pos","type":"long"},{"name":"row","type":"int"},{"name":"thread","type":["null","long"],"default":null},{"name":"query","type":["null","string"],"default":null}],"connect.name":"io.debezium.connector.mysql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"datapipe.inventory.customers.Envelope"}`
var d DebeziumSchema
err := json.Unmarshal([]byte(schema), &d)
if err != nil {
panic(err)
}
columns := make(map[string]string)

for _, field := range d.Fields {
if field.Name != "before" {
continue
}

for _, k := range field.Type.([]interface{}) {
switch k.(type) {
case map[string]interface{}:
rk := k.(map[string]interface{})
for rk1, rk2 := range rk {
if rk1 != "fields" {
continue
}
for _, kkl := range rk2.([]interface{}) {
k22 := kkl.(map[string]interface{})
columns[k22["name"].(string)] = k22["type"].(string)
}
}

}
}

}

fmt.Println(columns)
}
map[email:string first_name:string id:int last_name:string]

我能够提取这样做的专栏。但我相信这可以用一种更好的方式来完成。

https://play.golang.org/p/e0-F4Un4Mzv

相关内容

  • 没有找到相关文章

最新更新