可以使用Pool.map检索工作程序的输出,但当一个工作程序失败时,将引发异常,无法再检索输出。因此,我的想法是将输出记录在流程同步队列中,以便检索所有成功工作者的输出。
以下片段似乎有效:
from multiprocessing import Pool, Manager
from functools import partial
def f(x, queue):
if x == 4:
raise Exception("Error")
queue.put_nowait(x)
if __name__ == '__main__':
queue = Manager().Queue()
pool = Pool(2)
try:
pool.map(partial(f, queue=queue), range(6))
pool.close()
pool.join()
except:
print("An error occurred")
while not queue.empty():
print("Output => " + str(queue.get()))
但我想知道在队列轮询阶段是否会出现竞争情况。我不确定当所有工作人员都完成后,队列进程是否一定会处于活动状态。从这个角度来看,你认为我的代码是正确的吗?
远至"如何正确地处理异常";,这是你的主要问题:
首先,在您的情况下,您将永远无法执行pool.close
和pool.join
。但是,在所有提交的任务返回结果或生成异常之前,pool.map
不会返回,因此您确实不需要调用这些函数来确保所有提交任务都已完成。如果不是工作函数f
将结果写入队列,那么只要任何任务导致异常,就永远无法使用map
返回任何结果。相反,您必须apply_async
个单独的任务,并为每个任务获得AsyncResult
个实例。
因此,我想说,在不必使用队列的情况下,处理工作函数中的异常的更好方法如下。但请注意,当您使用apply_async
时,任务一次提交一个任务,这可能会导致许多共享内存访问。只有当提交的任务数量非常大时,这才真正成为性能问题。在这种情况下,工作函数最好自己处理异常,并以某种方式返回错误指示,以允许使用map
或imap
,在其中可以指定chunksize
。
使用队列时,请注意写入托管队列会有相当大的开销。第二段代码展示了如何通过使用multiprocessing.Queue
实例来稍微减少开销,该实例不像托管队列那样使用代理。请注意输出顺序,它不是任务提交的顺序,而是任务完成的顺序——使用队列的另一个潜在缺点或优点(如果希望按顺序完成结果,可以使用apply_async
的回调函数)。即使使用原始代码,也不应该依赖于队列中结果的顺序。
from multiprocessing import Pool
def f(x):
if x == 4:
raise Exception("Error")
return x
if __name__ == '__main__':
pool = Pool(2)
results = [pool.apply_async(f, args=(x,)) for x in range(6)]
for x, result in enumerate(results): # result is AsyncResult instance:
try:
return_value = result.get()
except:
print(f'An error occurred for x = {x}')
else:
print(f'For x = {x} the return value is {return_value}')
打印:
For x = 0 the return value is 0
For x = 1 the return value is 1
For x = 2 the return value is 2
For x = 3 the return value is 3
An error occurred for x = 4
For x = 5 the return value is 5
OP的原始代码修改为使用multiprocessing.Queue
from multiprocessing import Pool, Queue
def init_pool(q):
global queue
queue = q
def f(x):
if x == 4:
raise Exception("Error")
queue.put_nowait(x)
if __name__ == '__main__':
queue = Queue()
pool = Pool(2, initializer=init_pool, initargs=(queue,))
try:
pool.map(f, range(6))
except:
print("An error occurred")
while not queue.empty():
print("Output => " + str(queue.get()))
打印:
An error occurred
Output => 0
Output => 2
Output => 3
Output => 1
Output => 5