请注意,此演示代码会生成几GB的数据。
一段时间以来,我一直在使用以下代码版本进行多处理。当池中每个进程的运行时间相似时,它运行良好,但如果一个进程需要更长的时间,我最终会有许多阻塞的进程等待一个进程,所以我试图让它异步运行 - 一次只为一个函数。
例如,如果我有 70 个内核并且需要运行一个函数 2000 次,我希望它异步运行,然后在调用下一个函数之前等待最后一个进程。目前,它只是批量提交我给它多少个内核的进程,每批都必须等待最长的进程。
如您所见,我尝试使用map_async但这显然是错误的语法。谁能帮我?
import os
p='PATH/test/'
def f1(tup):
x,y=tup
to_write = x*(y**5)
with open(p+x+str(y)+'.txt','w') as fout:
fout.write(to_write)
def f2(tup):
x,y=tup
print (os.path.exists(p+x+str(y)+'.txt'))
def call_func(f,nos,threads,call):
print (call)
for i in range(0, len(nos), threads):
print (i)
chunk = nos[i:i + threads]
tmp = [('args', no) for no in chunk]
pool.map(f, tmp)
#pool.map_async(f, tmp)
nos=[i for i in range(55)]
threads=8
if __name__ == '__main__':
with Pool(processes=threads) as pool:
call_func(f1,nos,threads,'f1')
call_func(f2,nos,threads,'f2')
>map
只会返回,map_async
只会在当前块的所有任务完成后调用回调。
因此,您只能一次将所有任务交给map
/map_async
,也可以使用apply_async
(最初称为threads
次(,其中callback
为下一个任务调用apply_async
。
如果调用的实际返回值无关紧要(或者至少它们的顺序无关紧要(,那么一次为其提供所有任务(或按需生成任务的迭代器/生成器(时,imap_unordered
可能是另一种有效的解决方案