我有几十万个csv文件,我都想对它们应用相同的函数
def process_single_file(fname):
df = pd.read_csv(fname)
# Pandas and non-pandas processing
df.to_csv(f"./output/{fname}")
由于单独循环所有文件会花费太长时间,我的问题是调度和并行执行的最有效方法是什么——没有进程相互依赖。我开始尝试使用python的multiprocessing
:
import multiprocessing
files = sorted(glob.glob("./input/*.csv"))
processes = []
for fname in files:
p = multiprocessing.Process(target=process_file, args=(fname,))
processes.append(p)
p.start()
for process in processes:
process.join()
然而,我的电脑似乎不喜欢这个过程,因为它很快就会使所有CPU过载,导致速度减慢和崩溃。是否有更有效的方法来减少所有CPU的工作负载并安排任务,例如使用Dask
、某些Bash
脚本或更改python
?提前谢谢。
这实际上取决于瓶颈在哪里:你是花更多的时间读/写文件,还是做CPU处理
这个RealPython教程真的帮了我很多关于这些东西的学习,我只能推荐一本好的读物;(
正如教程中所解释的,如果I/O,多线程就足够了(可能比多处理更好(:
def process_all_files(files):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(process_single_file, files)
如果是CPU,多处理将允许您使用所有可用的核心:
def process_all_files(files):
with multiprocessing.Pool() as pool:
pool.map(process_single_file, files)
你可以试试Ray,它是一个非常高效的模块来并行任务
游泳池绝对是最好的选择。
`来自多处理导入池
def f(x(:返回x*x
如果name='main':pool=池(进程=4(`
检查以下后
使用多处理。具有最大同时进程数的进程