使用数据流将数据从数据存储迁移到 Bigquery



我正在尝试编写一个数据流管道,以使用Python将数据从Google Datastore迁移到BigQuery。经过一些搜索,我想我需要做三个步骤:

1. ReadFromDatastore
2. Convert to Python dicts or Tablerows
3. WriteToBigQuery

现在,第一步和最后一步很简单,因为它们是函数本身。但是我很难找到一个好方法来做第二步。

我将 ReadFromDatastore 的输出写入文本文件,json 如下所示:

key {
partition_id {
project_id: "ProjectID"
}
path {
kind: "KindName"
id:9999
}
}
properties {
key: "property1"
value {
string_value: "property_value"
}
}
properties {
key: "property2"
value {
string_value: ""
}
}
properties {
key: "property3"
value {
boolean_value: false
}
}
properties {
key: "created"
value {
timestamp_value {
seconds: 4444
nanos: 2222
}
}
}
properties {
key: "created_by"
value {
string_value: "property_value"
}
}
properties {
key: "date_created"
value {
timestamp_value {
seconds: 4444
}
}
}
properties {
key: "property4"
value {
string_value: "property_value"
}
}
properties {
key: "property5"
value {
array_value {
values {
meaning: 00
string_value: "link"
exclude_from_indexes: true
}
}
}
}
properties {
key: "property6"
value {
null_value: NULL_VALUE
}
}
properties {
key: "property7"
value {
string_value: "property_value"
}
}
properties {
key: "property8"
value {
string_value: ""
}
}
properties {
key: "property9"
value {
timestamp_value {
seconds: 3333
nanos: 3333
}
}
}
properties {
key: "property10"
value {
meaning: 00
string_value: ""
exclude_from_indexes: true
}
}
properties {
key: "property11"
value {
boolean_value: false
}
}
properties {
key: "property12"
value {
array_value {
values {
key_value {
partition_id {
project_id: "project_id"
}
path {
kind: "Another_kind_name"
id: 4444
}
}
}
}
}
}
properties {
key: "property13"
value {
string_value: "property_value"
}
}
properties {
key: "version"
value {
integer_value: 4444
}
}
key {
partition_id {
project_id: "ProjectID"
}
path {
kind: "KindName"
id: 9999
}
}
.
.
.
.next_entity/row

我是否必须编写一个自定义函数来将 json 转换为 python 字典才能写入 BigQuery,或者是否有来自谷歌数据存储或 apache 的任何函数/库可供我使用?

我找到了一篇描述我正在尝试做什么的文章,但显示的代码是用 Java 编写的。

ReadFromDatastore转换的输出是Entity类型的协议缓冲区。

要将 protobuff 转换为 JSON,您可以查看此问题:python 中的 protobuf 到 json

你会做:

p | ReadFromDatastore(...) | beam.Map(my_proto_to_json_fn) | beam.WriteToBigQuery(...)

最新更新