TL;DR:asyncio
vsmulti-processing
vsthreading
vssome other solution
并行化循环,该循环从GCS读取文件,然后将这些数据一起附加到pandas数据帧中,然后写入BigQuery。。。
我想并行生成一个python函数,该函数从GCS目录中读取数十万个小的.json文件,然后将这些.jsons转换为panda数据帧,然后将panda数据框写入BigQuery表。
这是一个非并行版本的函数:
import gcsfs
import pandas as pd
from my.helpers import get_gcs_file_list
def load_gcs_to_bq(gcs_directory, bq_table):
# my own function to get list of filenames from GCS directory
files = get_gcs_file_list(directory=gcs_directory) #
# Create new table
output_df = pd.DataFrame()
fs = gcsfs.GCSFileSystem() # Google Cloud Storage (GCS) File System (FS)
counter = 0
for file in files:
# read files from GCS
with fs.open(file, 'r') as f:
gcs_data = json.loads(f.read())
data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
this_df = pd.DataFrame(data)
output_df = output_df.append(this_df)
# Write to BigQuery for every 5K rows of data
counter += 1
if (counter % 5000 == 0):
pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')
output_df = pd.DataFrame() # and reset the dataframe
# Write remaining rows to BigQuery
pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')
这个功能很简单:
- 抓取
['gcs_dir/file1.json', 'gcs_dir/file2.json', ...]
,GCS中的文件名列表 - 在每个文件名上循环,并且:
- 从GCS读取文件
- 将数据转换为熊猫DF
- 附加到主要熊猫DF
- 每5K个循环,写入BigQuery(因为DF越大,追加速度就越慢(
我必须在几个GCS目录上运行此功能,每个目录包含约500K个文件。由于读/写这么多小文件的瓶颈,单个目录的这个过程将需要大约24小时。。。如果我能让它更并行以加快速度,那就太好了,因为它看起来像是一个适合并行化的任务。
编辑:下面的解决方案很有帮助,但我对在python脚本中并行运行特别感兴趣。Pandas正在处理一些数据清理,使用bq load
会引发错误。asyncio和这个gcloud-aio存储似乎都对这项任务有用,可能是比线程或多处理更好的选择。。。
与其在python代码中添加并行处理,不如考虑多次并行调用python程序。对于在命令行上获取文件列表的程序来说,这是一个更容易实现的技巧。因此,为了这篇文章,让我们考虑更改程序中的一行:
您的线路:
# my own function to get list of filenames from GCS directory
files = get_gcs_file_list(directory=gcs_directory) #
新品:
files = sys.argv[1:] # ok, import sys, too
现在,您可以通过以下方式调用程序:
PROCESSES=100
get_gcs_file_list.py | xargs -P $PROCESSES your_program
xargs
现在将采用get_gcs_file_list.py
输出的文件名,并并行调用your_program
多达100次,在每行中尽可能多地拟合文件名。我认为文件名的数量限制在shell允许的最大命令大小之内。如果100个进程不足以处理所有文件,那么xargs将一次(又一次(调用your_program
,直到它从stdin读取的所有文件名都得到处理。xargs
确保同时运行的your_program
调用不超过100次。您可以根据主机可用的资源来改变进程的数量。
您可以直接使用bq
命令来代替此操作。
当您使用此命令时,加载将在谷歌的网络中进行,这比我们创建数据帧并加载到表的速度要快。
bq load
--autodetect
--source_format=NEWLINE_DELIMITED_JSON
mydataset.mytable
gs://mybucket/my_json_folder/*.json
更多信息-https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json#loading_json_data_into_a_new_table