通过过滤pandas数据帧的差异标准生成多个ndjson文件[3000+]



我需要根据某些标准从pandas数据帧生成3000+ndjson文件。我试着运行下面的代码,它很有效,但需要很多时间才能完成。

def p_generate_files(result_df: pd.DataFrame, p_code: str) -> None:
print(result_df.shape)
tmp_df = result_df.filter(like=str(p_code), axis=0)
start_date = tmp_df.index.unique(level='date').min().to_pydatetime().strftime('%b').upper()
end_date = tmp_df.index.unique(level='date').max().to_pydatetime().strftime('%b').upper()
file_name_path = f'data/CR-{p_code}-{start_date}-{end_date}-2000.json'
tmp_df.reset_index(inplace=True)
tmp_df.to_json(
file_name_path,
orient="records",
index=True,
lines=True)
result_df.drop(labels= p_code, inplace = True)

我尝试了以下并行处理的实现,但似乎不起作用。我没有并发编程的经验。感谢您对加快处理速度的任何帮助。

p_generate_files = partial(generate_files,result_df=big_df)
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(p_generate_files, p_codes)

尝试multiprocessing,您必须将输入设置为元组列表

import multiprocessing as mp
def generate_files(result_df: pd.DataFrame, codes: list) -> None:
# Your function
if __name__ == '__main__':
cores = mp.cpu_count()
args = [(df1, lst1), (df2, lst2), (df3, lst3) ...]
with mp.Pool(processes=cores) as pool:
results = pool.starmap(merge_names, args)

最新更新