如何将GCS Parquet数据写入BigQuery(批处理)



我们要解决的问题

每天一次将存储在GCP的CloudStorage中的Parquet类型数据写入BigQuery表(批处理)。

这个实现有问题。

问题到目前为止,我们一直在GCS中存储以下CSV文件,并运行一个作业,使用数据流"云存储上的文本文件到BigQuery">将其写入BigQuery模板。

电话平直的00000 00 0000−−··

如果你想使用Dataflow,没有Cloud Storage Parquet to BigQuery谷歌提供的模板可用。您可以通过本页的链接提交特性请求,以便创建这样的模板。我从这里检查了一下,到今天为止,没有关于它的现有FR。

如果你想自己编写一个Dataflow/Beam管道,你可以参考的最接近的例子是谷歌提供的模板Cloud Storage Parquet to BigtableCloud Storage Text to BigQuery(源代码是Java的),你必须使用Apache Beam Python SDK来编写一个管道来完成它。

或者,如果您遵循这个BigQuery文档从云存储加载Parquet数据,那么有这个代码片段可以在不使用Dataflow的情况下完成这项工作:

import io
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
],
)
body = io.BytesIO(b"Washington,WA")
client.load_table_from_file(body, table_id, job_config=job_config).result()
previous_rows = client.get_table(table_id).num_rows
assert previous_rows > 0
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
source_format=bigquery.SourceFormat.PARQUET,
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
)  # Make an API request.
load_job.result()  # Waits for the job to complete.
destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

最新更新