在许多文件上并行/调度python函数调用



我有几十万个csv文件,我都想对它们应用相同的函数

def process_single_file(fname):
df = pd.read_csv(fname)
# Pandas and non-pandas processing
df.to_csv(f"./output/{fname}")

由于单独循环所有文件会花费太长时间,我的问题是调度和并行执行的最有效方法是什么——没有进程相互依赖。我开始尝试使用python的multiprocessing:

import multiprocessing
files = sorted(glob.glob("./input/*.csv"))
processes = []
for fname in files:
p = multiprocessing.Process(target=process_file, args=(fname,))
processes.append(p)
p.start()
for process in processes:
process.join()

然而,我的电脑似乎不喜欢这个过程,因为它很快就会使所有CPU过载,导致速度减慢和崩溃。是否有更有效的方法来减少所有CPU的工作负载并安排任务,例如使用Dask、某些Bash脚本或更改python?提前谢谢。

这实际上取决于瓶颈在哪里:你是花更多的时间读/写文件,还是做CPU处理

这个RealPython教程真的帮了我很多关于这些东西的学习,我只能推荐一本好的读物;(

正如教程中所解释的,如果I/O,多线程就足够了(可能比多处理更好(:

def process_all_files(files):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(process_single_file, files)

如果是CPU,多处理将允许您使用所有可用的核心:

def process_all_files(files):
with multiprocessing.Pool() as pool:
pool.map(process_single_file, files)

你可以试试Ray,它是一个非常高效的模块来并行任务

游泳池绝对是最好的选择。

`来自多处理导入池

def f(x(:返回x*x

如果name='main':pool=池(进程=4(`

检查以下后

使用多处理。具有最大同时进程数的进程

最新更新