multiprocessing
的文档对Pool.join()
进行了如下说明:
等待工作进程退出。在使用
join()
之前,必须调用close()
或terminate()
。
我知道Pool.close()
阻止任何其他任务提交到池中;并且Pool.join()
等待池完成,然后再继续父进程。
那么,为什么在我想重用我的池来执行多个任务然后最终close()
池的情况下,Pool.close()
之前不能调用Pool.join()
呢?例如:
pool = Pool()
pool.map(do1)
pool.join() # need to wait here for synchronization
.
.
.
pool.map(do2)
pool.join() # need to wait here again for synchronization
.
.
.
pool.map(do3)
pool.join() # need to wait here again for synchronization
pool.close()
# program ends
为什么必须"在使用join()
之前呼叫close()
或terminate()
"?
那么,为什么我不能在
Pool.close()
之前打电话给Pool.join()
因为join()
等待工人离开。不仅要完成分配给他们的任务,还要真正退出。如果你没有事先打电话给close()
,那么没有人告诉工人离开,他们处于待命状态,准备接受进一步的任务。
因此,在给join()
打电话之前没有打电话给close()
就会挂起——join()
会永远等待工人离开,没有人告诉他们这样做。出于这个原因,如果 yopu 尝试这样做,Python 会引发ValueError("pool is still running")
错误。
正如大卫·施瓦茨(David Schwartz(指出的那样,不要称join()
为"同步"——它不符合这个目的。
在这种情况下,您不需要在map()
后调用join()
,因为map()
调用块,直到所有结果都完成。
在close()
或terminate()
之前呼叫join()
不正确。因为join()
是阻塞调用并等待工作进程退出。因此,join()
后不能重复使用池。
只需重用池而不对其调用任何特殊函数。无需执行任何特殊操作即可继续将作业发送到池。如果你还没有完全完成它,就别管它,让它继续做它的事情。
为了使它变得显而易见,您可以使用以下代码向自己证明代码实际上是同步的:
import multiprocessing
import time
import datetime
def do_something(i):
with open(f'{i}.txt', 'w') as fp:
fp.write(str(i))
time.sleep(5)
def main():
pool = multiprocessing.Pool(5)
print('Starting process', datetime.datetime.now())
pool.map(do_something, range(5))
print('Waiting for sync', datetime.datetime.now())
pool.map(do_something, range(5, 10))
print('Waiting for final sync', datetime.datetime.now())
if __name__ == '__main__':
main()
你会观察到
等待同步
在第一个池完成其分配的任务之前不打印。Pool.join()
专门等待工作进程退出,而不是等待它们同步。无论如何,它们都不会在此配置中异步运行,并且可以正常同步。