将查询结果保存在Cloud Storage中的BigQuery Table中



我想知道什么是存储谷歌BigQuery表查询结果到谷歌云存储的最佳方式。我的代码目前正在一些Jupyter Notebook(在Vertex AI Workbench中,与BigQuery数据源以及Cloud Storage目的地相同的项目)中运行,如下所示:

# CELL 1 OF 2
from google.cloud import bigquery
bqclient = bigquery.Client()
# The query string can vary:
query_string = """
SELECT *  
FROM `my_project-name.my_db.my_table` 
LIMIT 2000000
"""
dataframe = (
bqclient.query(query_string)
.result()
.to_dataframe(
create_bqstorage_client=True,
)
)
print("Dataframe shape: ", dataframe.shape)
# CELL 2 OF 2:
import pandas as pd
dataframe.to_csv('gs://my_bucket/test_file.csv', index=False)

此代码大约需要7.5分钟才能成功完成。

是否有更优的方法来实现上面所做的?(这意味着更快,但也许其他方面可以改进)。

附加说明:

  1. 我想通过Jupyter notebook运行它;(在Vertex AI Workbench中),因为有时必须进行一些数据预处理或特殊过滤,这无法通过SQL查询轻松完成。
  2. 对于代码的第一部分,我放弃了pandas。read_gbq,因为它给了我一些奇怪的EOF错误,当(实验性地)"存储为。csv并读取返回"时。
  3. 直观地说,我会把优化工作的重点放在代码的后半部分(CELL 2 OF 2),因为第一部分是从Google官方文档中借来的。我已经尝试过了,但它不工作,但在同一线程中,此选项工作正常。
  4. 这段代码很可能会被包含在一些Docker镜像中,所以"尽可能少的库"。

谢谢。

使用EXPORT DATA语句:


EXPORT DATA OPTIONS(
uri='gs://bucket/folder/*.csv',
format='CSV',
overwrite=true,
header=true,
field_delimiter=';') AS
SELECT *
FROM `my_project.my_db.my_table`
LIMIT 2000000
  • URI中的*允许将一个表导出为多个表。只有当导出的表大于1GB时才有关系(见这里)

直接在BQ控制台,通过命令行或在您的python脚本中执行。

from google.cloud import bigquery
client = bigquery.Client()
query_job = client.query(
"""
EXPORT DATA OPTIONS(
uri='gs://bucket/folder/*.csv',
format='CSV',
overwrite=true,
header=true,
field_delimiter=';') AS
SELECT *
FROM `my_project.my_db.my_table`
LIMIT 2000000
"""
)
results = query_job.result()  # Waits for job to complete.

关于LIMIT的说明:注意LIMIT不会减少非聚类表中的数据读取量。只有返回的东西。(看到)

经过一些实验,我想我已经找到了解决我原来的帖子。首先,更新后的代码:

import pandas as pd  # Just one library is imported this time
# This SQL query can vary, modify it to match your needs
query_string = """
SELECT *
FROM `my_project.my_db.my_table`
LIMIT 2000000
"""
# One liner to query BigQuery data.
downloaded_dataframe = pd.read_gbq(query_string, dialect='standard', use_bqstorage_api=True)
# Data processing (OPTIONAL, modify it to match your needs)
# I won't do anything this time, just upload the previously queried data
# Data store in GCS
downloaded_dataframe.to_csv('gs://my_bucket/uploaded_data.csv', index=False)

最后的注释:

  1. 我没有做过"深入的研究"。关于处理速度VS BigQuery表中存在的行数,但是我看到更新代码和原始查询的处理时间现在需要~6分钟;这就够了。这个答案可能有一些进一步改进的空间因此,但它比原来的情况要好。
  2. 我在原来的帖子中提到的EOF错误是:ParserError: Error tokenizing data. C error: EOF inside string starting at row 70198。最后我意识到这与pandas_gbq函数没有任何关系,而是与"我如何保存数据"有关。看,我是"实验性"存储。csv文件在顶点AI工作台本地存储,然后将其下载到我的本地设备,当试图从我的本地设备打开该数据时,我一直绊倒在这个错误,但没有得到相同时从云存储下载。csv数据…为什么?如果你下载。csv数据的速度非常快之后,它生成了。(即,几秒钟后),从顶点AI工作台本地存储,数据仍然是不完整的,但它不会给出任何错误或警告信息:它会简单地"让你开始下载"。出于这个原因,我认为将数据导出到云存储,然后从那里安全地下载是更安全的。这种行为在大文件上更为明显(例如,我自己生成的文件有~3.1GB大小)。

希望这对你有帮助。

谢谢。

在此链接中,您将找到完成此任务的方法:https://cloud.google.com/bigquery/docs/samples/bigquery-extract-table?hl=en

尽管如此,还是有一些地方需要注意。

  • 这只是一个摘录,但如果你要做一些转换,你可能会使用Dataflow或Composer,最后一个允许你使用SQL转换使用BQ作业来实现你想要的。另一方面,Dataflow使用python来处理代码并创建Job。

  • 你也可能想要照顾你的bq表性能,分区和集群它的细节https://cloud.google.com/bigquery/docs/clustered-tables;我还注意到您正在使用limit语句,这对您的性能无效,因为此BQ是列状的,因此您仍在扫描所有列和所有数据。

片段:

# from google.cloud import bigquery
# client = bigquery.Client()
# bucket_name = 'my-bucket'
project = "bigquery-public-data"
dataset_id = "samples"
table_id = "shakespeare"
destination_uri = "gs://{}/{}".format(bucket_name, "shakespeare.csv")
dataset_ref = bigquery.DatasetReference(project, dataset_id)
table_ref = dataset_ref.table(table_id)
extract_job = client.extract_table(
table_ref,
destination_uri,
# Location must match that of the source table.
location="US",
)  # API request
extract_job.result()  # Waits for job to complete.
print(
"Exported {}:{}.{} to {}".format(project, dataset_id, table_id, destination_uri)
)

希望这对你有帮助:)

相关内容

  • 没有找到相关文章

最新更新