如何使用ApacheBeam(Python)将多个嵌套JSON写入BigQuery表



我正在使用Python从Dataflow向BigQuery表编写一个复杂的JSON对象集合。手动创建如下表模式太复杂了,因为我的json对象嵌套了多层。

from apache_beam.io.gcp.internal.clients import bigquery
table_schema=bigquery.TableSchema()
id_schema = bigquery.TableFieldSchema()
id_schema.name = 'ID'
id_schema.type = 'integer'
id_schema.mode = 'nullable'
table_schema.fields.append(id_schema)
...

因此,我尝试了从Dataflow(Python(向BigQuery编写嵌套模式所推荐的方法。首先,我在云控制台中运行以下命令以获取模式

bq --format=json show project:dataset.table > output_schema.json

然后我运行以下代码获取表模式,

table_schema = parse_table_schema_from_json(json.dumps(json.load(open("output_schema.json"))["schema"]))

这完全符合预期。这个表最初是从Jupyter笔记本创建的,在那里我可以使用bigquery。LoadJobConfig具有自动检测功能,可在不提供架构的情况下写入BigQuery。

现在我使用ApacheBeam管道尝试用这个模式写入BigQuery,不知怎么的,我得到了一些错误,比如:

WARNING:apache_beam.io.gcp.bigquery:There were errors inserting to BigQuery. Will retry. Errors were [<InsertErrorsValueListEntry
errors: [<ErrorProto
debugInfo: ''
location: 'sectiontokens.documents'
message: 'Array specified for non-repeated field.'
reason: 'invalid'>]
index: 0>, <InsertErrorsValueListEntry
errors: [<ErrorProto
debugInfo: ''
location: 'sectiontokens.errors'
message: 'Array specified for non-repeated field.'
reason: 'invalid'>]
index: 1>, <InsertErrorsValueListEntry
errors: [<ErrorProto
debugInfo: ''
location: 'sectiontokens.documents'
message: 'Array specified for non-repeated field.'
reason: 'invalid'>]
index: 2>]

我的表格模式是:

table_schema = {
"fields": [
{"name": "ID", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "SourceResourceID","type": "STRING","mode": "NULLABLE"},
{"name": "DocumentText","type": "STRING","mode": "NULLABLE"},
{"name": "DocumentName","type": "STRING","mode": "NULLABLE"},
{"name": "EncounterNumber","type": "FLOAT","mode": "NULLABLE"},
{"name": "EncounterResourceID","type": "STRING","mode": "NULLABLE"},
{"name": "DocumentId","type": "STRING","mode": "NULLABLE"},
{"name": "DocumentDate","type": "TIMESTAMP","mode": "NULLABLE"},
{"name": "SectionTitle","type": "STRING","mode": "NULLABLE"},
{"name": "SectionHeader","type": "STRING","mode": "NULLABLE"},
{"name": "SectionText","type": "STRING","mode": "NULLABLE"},
{"name": "SectionTokens","type": "RECORD","mode": "NULLABLE",
"fields": [
{"name": "documents","type": "RECORD","mode": "NULLABLE",
"fields": [
{"name": "list","type": "RECORD","mode": "REPEATED",
"fields": [
{"name": "item","type": "RECORD","mode": "NULLABLE",
"fields": [
{"name": "entities","type": "RECORD","mode": "NULLABLE",
"fields": [
{"name": "list","type": "RECORD","mode": "REPEATED",
"fields": [
{"name": "item","type": "RECORD","mode": "NULLABLE",
"fields": [
{"name": "category","type": "STRING","mode": "NULLABLE"},
{"name": "confidenceScore","type": "FLOAT","mode": "NULLABLE"},
{"name": "id","type": "STRING","mode": "NULLABLE"},
{"name": "isNegated","type": "BOOLEAN","mode": "NULLABLE"},
{"name": "length","type": "INTEGER","mode": "NULLABLE"},
{"name": "links","type": "RECORD","mode": "NULLABLE",
"fields": [
{"name": "list","type": "RECORD","mode": "REPEATED",
"fields": [
{"name": "item","type": "RECORD","mode": "NULLABLE",
"fields": [
{"name": "dataSource","type": "STRING","mode":"NULLABLE"},
{"name": "id","type": "STRING","mode": "NULLABLE"}
]
}
]
}
]
},
{"name": "offset","type": "INTEGER","mode": "NULLABLE"},
{"name": "text","type": "STRING","mode": "NULLABLE"}
]
}
]
}
]
},
{"name": "id","type": "STRING","mode": "NULLABLE"},
{"name": "relations","type": "RECORD","mode": "NULLABLE",
"fields": [
{"name": "list","type": "RECORD","mode": "REPEATED",
"fields": [
{"name": "item","type": "RECORD","mode": "NULLABLE",
"fields": [
{"name": "bidirectional","type": "BOOLEAN","mode": "NULLABLE"},
{"name": "relationType","type": "STRING","mode": "NULLABLE"},
{"name": "source","type": "STRING","mode": "NULLABLE"},
{"name": "target","type": "STRING","mode": "NULLABLE"}
]
}
]
}
]
}
]
}
]
}
]
},
{"name": "errors","type": "RECORD","mode": "NULLABLE",
"fields": [
{"name": "list","type": "RECORD","mode": "REPEATED",
"fields": [
{"name": "item","type": "RECORD","mode": "NULLABLE",
"fields": [
{"name": "error","type": "RECORD","mode": "NULLABLE",
"fields": [
{"name": "code","type": "STRING","mode": "NULLABLE"},
{"name": "innererror","type": "RECORD","mode": "NULLABLE",
"fields": [
{"name": "code","type": "STRING","mode": "NULLABLE"},
{"name": "message","type": "STRING","mode": "NULLABLE"}
]
},
{"name": "message","type": "STRING","mode": "NULLABLE"}
]
},
{"name": "id","type": "STRING","mode": "NULLABLE"}
]
}
]
}
]
},
{"name": "modelVersion","type": "STRING","mode": "NULLABLE"}
]
}
]
}

下面是一些样本数据:

{'ID': 123, 'SourceResourceID': 'Resource/3c81b4d2-3ee9-11eb-8bf6-0242ac100303', 'DocumentText': 'EXAM:  CT CHEST IC  nnnPROCEDURE DATE:  12/11/2020  n', 'DocumentName': 'CT CHEST IC', 'EncounterNumber': None, 'EncounterResourceID': 'Encounter/123', 'DocumentId': '123', 'DocumentDate': '2020-12-15 10:21:00 UTC', 'SectionTitle': 'physical_exam', 'SectionHeader': 'EXAM:', 'SectionText': 'EXAM:  CT CHEST IC  nnnPROCEDURE DATE:  12/11/2020  n nnn', 'SectionTokens': {'documents': [{'id': '1', 'entities': [{'id': '0', 'offset': 7, 'length': 11, 'text': 'CT CHEST IC', 'category': 'ExaminationName', 'confidenceScore': 0.98, 'isNegated': False}]}], 'errors': [], 'modelVersion': '2020-09-03'}}

有人能帮我找出我做错了什么吗?谢谢

在您的模式中,sectiontokens.documentssectiontokens.errors被指定为类型RECORD,这意味着BigQuery希望该字段只有一条记录,但在您的数据中,这些键实际上是对象列表。

如果要定义一列以接受对象列表,则需要具有"mode":"REPEATED"https://cloud.google.com/bigquery/docs/nested-repeated

最新更新