使用goavro处理多种类型的解码数据



我想做什么

对于数据库中的每一次更改,我都试图将debezium事件转换为数据库值的CSV,以便加载到Redshift中。

对于下面的110更改,我正在尝试制作一个csv文件:110,vck,desc,221.1

mysql> select * from products;
+-----+-------------+---------------------------------------------------------+--------+
| id  | name        | description                                             | weight |
+-----+-------------+---------------------------------------------------------+--------+
| 110 | vck         | desc                                                    |  221.1 |
+-----+-------------+---------------------------------------------------------+--------+

这是我使用goavro的尝试。

程序

https://play.golang.org/p/A8Wd0sZPUEQ

package main
import (
"fmt"
"encoding/json"
)
func main() {
debeziumEvent := `{"before":null,"after":{"datapipe.inventory.products.Value":{"id":110,"name":"vck","description":{"string":"desc"},"weight":{"double":221.10000610351562}}},"source":{"query":null,"snapshot":{"string":"true"},"server_id":0,"gtid":null,"name":"datapipe","thread":null,"ts_ms":0,"file":"mysql-bin.000049","version":"1.2.1.Final","connector":"mysql","pos":154,"table":{"string":"products"},"row":0,"db":"inventory"},"op":"c","ts_ms":{"long":1597649700266},"transaction":null}`

var data map[string]interface{}
err := json.Unmarshal([]byte(debeziumEvent), &data)
if err != nil {
panic(err)
}

after := data["after"].(map[string]interface{})
csv := make([]interface{}, 0)

for _, v := range after {
for _, v2 := range v.(map[string]interface{}) {
switch stype := v2.(type) {
case map[string]interface{}:
for _, v3 := range v2.(map[string]interface{}) {
csv = append(csv, v3)
}
case string:
csv = append(csv, v2)
case int:
csv = append(csv, v2)
case float64:
csv = append(csv, v2)
default:
fmt.Printf("type %s not handledn", stype) 
panic("unhandled type")
}

}
}

fmt.Println(csv)
}

有没有更好的方法?对于每种数据类型,我都需要在这里有一个switch语句。。。。

链接的GoAVRO问题:https://github.com/linkedin/goavro/issues/217

fmt.Sprintf可用于将接口转换为字符串。str := fmt.Sprintf("%v", v)

这样做可以将案例陈述减少到2:

package main
import (
"fmt"
"encoding/json"
)
func main() {
debeziumEvent := `{"before":null,"after":{"datapipe.inventory.products.Value":{"id":110,"name":"vck","description":{"string":"desc"},"weight":{"double":221.10000610351562}}},"source":{"query":null,"snapshot":{"string":"true"},"server_id":0,"gtid":null,"name":"datapipe","thread":null,"ts_ms":0,"file":"mysql-bin.000049","version":"1.2.1.Final","connector":"mysql","pos":154,"table":{"string":"products"},"row":0,"db":"inventory"},"op":"c","ts_ms":{"long":1597649700266},"transaction":null}`

var data map[string]interface{}
err := json.Unmarshal([]byte(debeziumEvent), &data)
if err != nil {
panic(err)
}
//fmt.Printf("data=%vn", data)

after := data["after"].(map[string]interface{})
csv := []string{}

for _, v := range after {
for _, v2 := range v.(map[string]interface{}) {
switch v2.(type) {
case map[string]interface{}:
for _, v3 := range v2.(map[string]interface{}) {
csv = append(csv, fmt.Sprintf("%v", v3))
}
default:
csv = append(csv, fmt.Sprintf("%v", v2))
}

}
}

fmt.Println(csv)
}

相关内容

  • 没有找到相关文章

最新更新