我有大约 50k 使用清单文件从 S3 读取。我必须将每个 (JSON( 文件的内容读入数据帧并处理这些文件(将它们规范化为数据库表(。现在我有一个工作代码,大约需要 15 个小时来处理 50k 文件。我必须将其作为日常工作运行。有没有办法并行处理更多的文件或更好的方法来加快进程?
使用代码更新问题
import json
import pandas as pd
import os
import gzip
import boto3
from datetime import datetime,timezone,timedelta
session = boto3.session.Session()
s3 = session.resource('s3')
client = session.client('s3')
#read the S3 inventory report, get the keys of files that are modified on sysdate-1
dt=(datetime.now(timezone.utc) + timedelta(days=-1)).strftime('%Y-%m-%d')
dtz=dt+'T00-00Z'
print('reading inventory report for', dtz)
inventory_bucket = 'xxx'
manifest_key='s3-bucket'+dtz+'/manifest.json'
manifest = json.load(s3.Object(inventory_bucket, manifest_key).get()['Body'])
df=pd.DataFrame()
for obj in manifest['files']:
gzip_obj = s3.Object(bucket_name=inventory_bucket, key=obj['key'])
print('csv obj:', gzip_obj)
buffer = gzip.open(gzip_obj.get()["Body"], mode='rt')
reader = pd.read_csv(buffer)
reader.columns=['id','key','modified_date']
print('converting csv obj to dataframe')
df=df.append(reader[(reader.modified_date>dt)])
source_keys=list(df['key'])
s3_bucket_source='yyy'
#download the files to a tmp folder
local='/tmp/'
print("downloading from S3")
for k in source_keys:
k_path=k.replace('/', '-')
dest_pathname = os.path.join(local, k_path)
if not os.path.exists(os.path.dirname(dest_pathname)):
os.makedirs(os.path.dirname(dest_pathname))
client.download_file(s3_bucket_source, k, dest_pathname)
#read the latest jsons from tmp folder
path_to_json = os.path.expanduser('/tmp/')
json_files = [pos_json for pos_json in os.listdir(path_to_json) if pos_json.endswith('.json')]
#loop through the jsons and normalize each file contents into df_table1, df_table2, df_table3
for index, js in enumerate(json_files):
with open(os.path.join(path_to_json, js)) as json_file:
print('processing file:', js)
d=json.loads(json_file.read())
v=d['key1'][0]['key2']
if isinstance(v, list):
for i, v2 in enumerate(v):
df_table1, df_table2, df_table3 = normalizeJSON(d,i,v2['id'])
#normalize is the custom function to split the nested json into relational tables
else:
print('invalid json')
我使用 S3 清单报告从清单中获取最新修改文件的列表,将文件下载到 tmp 位置并逐个读取它们以执行我需要执行的操作
您可以使用多处理模块并行下载 JSON 文件。 您的代码包含 3个块。您可以并行执行其中的每一个。下面是如何为第一个执行此操作的示例:
第一个为:
df=pd.DataFrame()
for obj in manifest['files']:
gzip_obj = s3.Object(bucket_name=inventory_bucket, key=obj['key'])
print('csv obj:', gzip_obj)
buffer = gzip.open(gzip_obj.get()["Body"], mode='rt')
reader = pd.read_csv(buffer)
reader.columns=['id','key','modified_date']
print('converting csv obj to dataframe')
df=df.append(reader[(reader.modified_date>dt)])
成为:
def get_reader(obj):
gzip_obj = s3.Object(bucket_name=inventory_bucket, key=obj['key'])
print('csv obj:', gzip_obj)
buffer = gzip.open(gzip_obj.get()["Body"], mode='rt')
reader = pd.read_csv(buffer)
reader.columns=['id','key','modified_date']
print('converting csv obj to dataframe')
return reader[(reader.modified_date>dt)]
num_of_workers = 4
df = pd.DataFrame()
with multiprocessing.Pool(num_of_workers) as p:
results = p.map(get_reader, manifest['files'])
for result in results:
df = df.append(result)
您可以对块的其他块执行相同的操作