多处理:使用进程列表而不是池来映射函数



我有一个多处理任务,最简单的形式如下:

def fun(x):
y = setup()
return y.f(x) 
pool = mp.Pool(4)
pool.map(fun, my_list)

然而,setup()是昂贵的,所以我只想在每个过程中做一次,而不是在my_list中每个项目做一次。

我也不想pickley并将其发送到每个进程中,在这种情况下,我要求在每个进程中分别进行设置。

因此,我可以做这样的事情来设置每个过程:

class MyProcess(mp.Process):
def __init__(self):
self.y = setup()
def fun(x):
return self.y.f(x)
workers = [MyProcess() for _ in range(4)]

我现在有什么办法可以像使用游泳池一样使用工人吗?即将某个工人的worker.fun映射到my_list中的每个项目?理想情况下,我想要这样的东西:

for result in workers.imap_unordered(MyProcess.fun, my_list):
# do something

我怀疑使用队列的解决方案也会起作用,但我不完全确定如何实现它。

Pool已经支持在启动时自定义每个进程。定义创建y并使其可访问的池进程的初始化程序:

def init_process():
global y     # make y accessible to everything
y = setup()  # ... and initialise it
def fun(x):
# use already initialised y
return y.f(x) 
pool = mp.Pool(4, initializer=init_process)
pool.map(fun, my_list)

要在创建池中的进程时对其进行初始化,可以使用Poolinitializerinitargs参数。

举例说明该方法:

import multiprocessing as mp
init_obj = {}

def setup(a):
global init_obj
init_obj = {"one": a}

def fun(x):
y = init_obj
print(y)

pool = mp.Pool(None, initializer=setup, initargs=(1,))
pool.map(fun, [0, 1, 2])

我觉得你仍然可以使用一个池:

def fun(x)
y = setup()
return [y.f(item) for item in x]
processes = 4
newlist = []
for i in range(processes):
mylist[i * len(mylist)//4: (i + 1) * len(mylist)//4]
newlist.append(mylist)
pool = mp.Pool(4)
pool.map(fun, new_list)

拆分列表会有开销,但这是我能想到的减少调用setup次数的最简单的解决方案。

注意:此版本代码中的x是一个值列表,而不是单个值。

最新更新