我想在多个子进程中并行执行一些任务,如果任务没有在某个延迟内完成,则超时。
第一种方法包括单独分叉和连接子进程,并根据全局超时计算剩余超时,如本答案所示。它对我来说效果很好。
我想在这里使用的第二种方法包括创建一个子进程池并等待全局超时,如本答案所示。
但是我对第二种方法有问题:在向子进程池提供具有multiprocessing.Event()
对象的任务后,等待它们的完成会引发此异常:
运行时错误:条件对象应仅通过继承在进程之间共享
以下是 Python 代码片段:
import multiprocessing.pool
import time
class Worker:
def __init__(self):
self.event = multiprocessing.Event() # commenting this removes the RuntimeError
def work(self, x):
time.sleep(1)
return x * 10
if __name__ == "__main__":
pool_size = 2
timeout = 5
with multiprocessing.pool.Pool(pool_size) as pool:
result = pool.map_async(Worker().work, [4, 5, 2, 7])
print(result.get(timeout)) # raises the RuntimeError
在multiprocessing
— 基于进程的并行性文档的"编程指南"部分,有这样一段:
比泡菜/不泡菜更好继承
当使用spawn或forkserver启动方法时,
multiprocessing
中的许多类型都需要是可挑选的,以便子进程可以使用它们。但是,通常应避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问在其他地方创建的共享资源的进程可以从祖先进程继承它。
因此,multiprocessing.Event()
导致了RuntimeError
,因为它不可选取,如以下 Python 代码片段所示:
import multiprocessing
import pickle
pickle.dumps(multiprocessing.Event())
这引发了相同的异常:
运行时错误:条件对象应仅通过继承在进程之间共享
解决方案是使用代理对象:
代理是一个对象,它指的是(大概)存在于不同进程中的共享对象。
因为:
代理对象的一个重要特征是它们是可选取的,因此它们可以在进程之间传递。
multiprocessing.Manager().Event()
创建一个共享threading.Event()
对象并为其返回代理,因此替换以下行:
self.event = multiprocessing.Event()
通过问题的 Python 代码片段中的以下行解决了问题:
self.event = multiprocessing.Manager().Event()