如何维护池中的全局进程以递归方式工作?



我想实现一个递归并行算法,我希望只创建一个池,每个时间步骤做一个作业,等待所有作业完成,然后再次调用进程输入以前的输出,然后在下一个时间步再次相同,依此类推。

我的问题是我已经实现了一个版本,每次步骤我都会创建并杀死池,但这非常慢,甚至比顺序版本慢。当我尝试实现一个在开始时只创建一次池的版本时,当我尝试调用 join() 时出现断言错误。

这是我的代码

def log_result(result):
tempx , tempb, u = result
X[:,u,np.newaxis], b[:,u,np.newaxis] = tempx , tempb

workers =  mp.Pool(processes = 4) 
for t in range(p,T):
count = 0 #==========This is only master's job=============
for l in range(p):
for k in range(4):
gn[count]=train[t-l-1,k]
count+=1
G = G*v +  gn @ gn.T#==================================
if __name__ == '__main__':
for i in range(4):
workers.apply_async(OULtraining, args=(train[t,i], X[:,i,np.newaxis], b[:,i,np.newaxis], i, gn), callback = log_result)

workers.join()   

X 和 b 是我想直接在主存储器上更新的矩阵。

这里出了什么问题,我得到了断言错误?

我可以使用池实现我想要的东西吗?

您不能加入未首先关闭的池,因为join()将等待工作进程终止,而不是等待作业完成(https://docs.python.org/3.6/library/multiprocessing.html 第 17.2.2.9 节

)。但是由于这将关闭池,这不是您想要的,因此您不能使用它。所以加入就出来了,你需要自己实现一个"等到所有作业完成"。

在没有繁忙循环的情况下执行此操作的一种方法是使用队列。您也可以使用有界信号量,但它们不适用于所有操作系统。

counter = 0
lock_queue = multiprocessing.Queue()
counter_lock = multiprocessing.Lock()
def log_result(result):
tempx , tempb, u = result
X[:,u,np.newaxis], b[:,u,np.newaxis] = tempx , tempb
with counter_lock:
counter += 1
if counter == 4:
counter = 0
lock_queue.put(42)

workers =  mp.Pool(processes = 4) 
for t in range(p,T):
count = 0 #==========This is only master's job=============
for l in range(p):
for k in range(4):
gn[count]=train[t-l-1,k]
count+=1
G = G*v +  gn @ gn.T#==================================
if __name__ == '__main__':
counter = 0
for i in range(4):
workers.apply_async(OULtraining, args=(train[t,i], X[:,i,np.newaxis], b[:,i,np.newaxis], i, gn), callback = log_result)

lock_queue.get(block=True)

这会在提交作业之前重置全局计数器。作业完成后,回调将递增全局计数器。当计数器命中 4(您的作业数)时,回调知道它已处理最后一个结果。然后在队列中发送虚拟消息。您的主程序正在等待Queue.get()显示某些内容。

这允许主程序阻止,直到所有作业完成,而无需关闭池。

如果将multiprocessing.Pool替换为concurrent.futures中的ProcessPoolExecutor,则可以跳过此部分并使用

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

阻止,直到所有提交的任务都完成。从功能的角度来看,它们之间没有区别。concurrent.futures方法短了几行,但结果完全相同。

最新更新