我有一个多处理任务,最简单的形式如下:
def fun(x):
y = setup()
return y.f(x)
pool = mp.Pool(4)
pool.map(fun, my_list)
然而,setup()
是昂贵的,所以我只想在每个过程中做一次,而不是在my_list
中每个项目做一次。
我也不想pickley
并将其发送到每个进程中,在这种情况下,我要求在每个进程中分别进行设置。
因此,我可以做这样的事情来设置每个过程:
class MyProcess(mp.Process):
def __init__(self):
self.y = setup()
def fun(x):
return self.y.f(x)
workers = [MyProcess() for _ in range(4)]
我现在有什么办法可以像使用游泳池一样使用工人吗?即将某个工人的worker.fun
映射到my_list
中的每个项目?理想情况下,我想要这样的东西:
for result in workers.imap_unordered(MyProcess.fun, my_list):
# do something
我怀疑使用队列的解决方案也会起作用,但我不完全确定如何实现它。
Pool
已经支持在启动时自定义每个进程。定义创建y
并使其可访问的池进程的初始化程序:
def init_process():
global y # make y accessible to everything
y = setup() # ... and initialise it
def fun(x):
# use already initialised y
return y.f(x)
pool = mp.Pool(4, initializer=init_process)
pool.map(fun, my_list)
要在创建池中的进程时对其进行初始化,可以使用Pool
的initializer
和initargs
参数。
举例说明该方法:
import multiprocessing as mp
init_obj = {}
def setup(a):
global init_obj
init_obj = {"one": a}
def fun(x):
y = init_obj
print(y)
pool = mp.Pool(None, initializer=setup, initargs=(1,))
pool.map(fun, [0, 1, 2])
我觉得你仍然可以使用一个池:
def fun(x)
y = setup()
return [y.f(item) for item in x]
processes = 4
newlist = []
for i in range(processes):
mylist[i * len(mylist)//4: (i + 1) * len(mylist)//4]
newlist.append(mylist)
pool = mp.Pool(4)
pool.map(fun, new_list)
拆分列表会有开销,但这是我能想到的减少调用setup次数的最简单的解决方案。
注意:此版本代码中的x是一个值列表,而不是单个值。