从清单中读取 S3 文件并使用 Pandas 并行处理它们



我有大约 50k 使用清单文件从 S3 读取。我必须将每个 (JSON( 文件的内容读入数据帧并处理这些文件(将它们规范化为数据库表(。现在我有一个工作代码,大约需要 15 个小时来处理 50k 文件。我必须将其作为日常工作运行。有没有办法并行处理更多的文件或更好的方法来加快进程?

使用代码更新问题

import json 
import pandas as pd 
import os
import gzip
import boto3
from datetime import datetime,timezone,timedelta

session = boto3.session.Session()
s3 = session.resource('s3')
client = session.client('s3')
#read the S3 inventory report, get the keys of files that are modified on sysdate-1   
dt=(datetime.now(timezone.utc) + timedelta(days=-1)).strftime('%Y-%m-%d') 
dtz=dt+'T00-00Z'
print('reading inventory report for', dtz)
inventory_bucket = 'xxx'
manifest_key='s3-bucket'+dtz+'/manifest.json'
manifest = json.load(s3.Object(inventory_bucket, manifest_key).get()['Body'])
df=pd.DataFrame()
for obj in manifest['files']:
gzip_obj = s3.Object(bucket_name=inventory_bucket, key=obj['key'])
print('csv obj:', gzip_obj)
buffer = gzip.open(gzip_obj.get()["Body"], mode='rt')
reader = pd.read_csv(buffer)
reader.columns=['id','key','modified_date']
print('converting csv obj to dataframe')
df=df.append(reader[(reader.modified_date>dt)])
source_keys=list(df['key'])
s3_bucket_source='yyy'
#download the files to a tmp folder
local='/tmp/'
print("downloading from S3")
for k in source_keys:
k_path=k.replace('/', '-')
dest_pathname = os.path.join(local, k_path)
if not os.path.exists(os.path.dirname(dest_pathname)):
os.makedirs(os.path.dirname(dest_pathname))
client.download_file(s3_bucket_source, k, dest_pathname)
#read the latest jsons from tmp folder
path_to_json =  os.path.expanduser('/tmp/') 
json_files = [pos_json for pos_json in os.listdir(path_to_json) if pos_json.endswith('.json')]
#loop through the jsons and normalize each file contents into df_table1, df_table2, df_table3
for index, js in enumerate(json_files):
with open(os.path.join(path_to_json, js)) as json_file:
print('processing file:', js)
d=json.loads(json_file.read())
v=d['key1'][0]['key2']
if isinstance(v, list):
for i, v2 in enumerate(v): 
df_table1, df_table2, df_table3 = normalizeJSON(d,i,v2['id']) 
#normalize is the custom function to split the nested json into relational tables 
else:
print('invalid json')

我使用 S3 清单报告从清单中获取最新修改文件的列表,将文件下载到 tmp 位置并逐个读取它们以执行我需要执行的操作

您可以使用多处理模块并行下载 JSON 文件。 您的代码包含 3个块。您可以并行执行其中的每一个。下面是如何为第一个执行此操作的示例:

第一个

df=pd.DataFrame()
for obj in manifest['files']:
gzip_obj = s3.Object(bucket_name=inventory_bucket, key=obj['key'])
print('csv obj:', gzip_obj)
buffer = gzip.open(gzip_obj.get()["Body"], mode='rt')
reader = pd.read_csv(buffer)
reader.columns=['id','key','modified_date']
print('converting csv obj to dataframe')
df=df.append(reader[(reader.modified_date>dt)])

成为:

def get_reader(obj):
gzip_obj = s3.Object(bucket_name=inventory_bucket, key=obj['key'])
print('csv obj:', gzip_obj)
buffer = gzip.open(gzip_obj.get()["Body"], mode='rt')
reader = pd.read_csv(buffer)
reader.columns=['id','key','modified_date']
print('converting csv obj to dataframe')
return reader[(reader.modified_date>dt)]
num_of_workers = 4
df = pd.DataFrame()
with multiprocessing.Pool(num_of_workers) as p:
results = p.map(get_reader, manifest['files'])
for result in results:
df = df.append(result)

您可以对块的其他块执行相同的操作

最新更新