Python将.json文件从GCS并行读取到pandas DF中



TL;DR:asynciovsmulti-processingvsthreadingvssome other solution并行化循环,该循环从GCS读取文件,然后将这些数据一起附加到pandas数据帧中,然后写入BigQuery。。。

我想并行生成一个python函数,该函数从GCS目录中读取数十万个小的.json文件,然后将这些.jsons转换为panda数据帧,然后将panda数据框写入BigQuery表。

这是一个非并行版本的函数:

import gcsfs
import pandas as pd
from my.helpers import get_gcs_file_list
def load_gcs_to_bq(gcs_directory, bq_table):
# my own function to get list of filenames from GCS directory
files = get_gcs_file_list(directory=gcs_directory) # 
# Create new table
output_df = pd.DataFrame()
fs = gcsfs.GCSFileSystem() # Google Cloud Storage (GCS) File System (FS)
counter = 0
for file in files:
# read files from GCS
with fs.open(file, 'r') as f:
gcs_data = json.loads(f.read())
data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
this_df = pd.DataFrame(data)
output_df = output_df.append(this_df)
# Write to BigQuery for every 5K rows of data
counter += 1
if (counter % 5000 == 0):
pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')
output_df = pd.DataFrame() # and reset the dataframe

# Write remaining rows to BigQuery
pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')

这个功能很简单:

  • 抓取['gcs_dir/file1.json', 'gcs_dir/file2.json', ...],GCS中的文件名列表
  • 在每个文件名上循环,并且:
    • 从GCS读取文件
    • 将数据转换为熊猫DF
    • 附加到主要熊猫DF
    • 每5K个循环,写入BigQuery(因为DF越大,追加速度就越慢(

我必须在几个GCS目录上运行此功能,每个目录包含约500K个文件。由于读/写这么多小文件的瓶颈,单个目录的这个过程将需要大约24小时。。。如果我能让它更并行以加快速度,那就太好了,因为它看起来像是一个适合并行化的任务。

编辑:下面的解决方案很有帮助,但我对在python脚本中并行运行特别感兴趣。Pandas正在处理一些数据清理,使用bq load会引发错误。asyncio和这个gcloud-aio存储似乎都对这项任务有用,可能是比线程或多处理更好的选择。。。

与其在python代码中添加并行处理,不如考虑多次并行调用python程序。对于在命令行上获取文件列表的程序来说,这是一个更容易实现的技巧。因此,为了这篇文章,让我们考虑更改程序中的一行:

您的线路:

# my own function to get list of filenames from GCS directory
files = get_gcs_file_list(directory=gcs_directory) # 

新品:

files = sys.argv[1:]  # ok, import sys, too

现在,您可以通过以下方式调用程序:

PROCESSES=100
get_gcs_file_list.py | xargs -P $PROCESSES your_program

xargs现在将采用get_gcs_file_list.py输出的文件名,并并行调用your_program多达100次,在每行中尽可能多地拟合文件名。我认为文件名的数量限制在shell允许的最大命令大小之内。如果100个进程不足以处理所有文件,那么xargs将一次(又一次(调用your_program,直到它从stdin读取的所有文件名都得到处理。xargs确保同时运行的your_program调用不超过100次。您可以根据主机可用的资源来改变进程的数量。

您可以直接使用bq命令来代替此操作。

bq命令行工具是BigQuery的一个基于Python的命令行工具。

当您使用此命令时,加载将在谷歌的网络中进行,这比我们创建数据帧并加载到表的速度要快。

bq load 
--autodetect 
--source_format=NEWLINE_DELIMITED_JSON 
mydataset.mytable 
gs://mybucket/my_json_folder/*.json

更多信息-https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json#loading_json_data_into_a_new_table

最新更新