Python多处理starmap_async未终止



我正在尝试starmap_async方法来测试具有多个参数的多处理,我的Jupyter笔记本中确实运行了以下内容:

import multiprocessing
def test_async(i, j, k, l):
print(i, j, k, l)
with multiprocessing.Pool(processes=4) as pool:
result = pool.starmap_async(test_async, [(1, 2, 3, 4), (2, 3, 4, 5), (3, 4, 5, 6), (4, 5, 6, 7), (5, 6, 7, 8)])
result.get()

这个细胞永远不会终止,它总是处于*状态。当我强制终止它时,我得到错误

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-102-70185b68aa7c> in <module>
4 with multiprocessing.Pool(processes=4) as pool:
5     result = pool.starmap_async(test_async, [(1, 2, 3, 4), (2, 3, 4, 5), (3, 4, 5, 6), (4, 5, 6, 7)])
----> 6     result.get()
7     pool.join()
~/miniconda3/lib/python3.8/multiprocessing/pool.py in get(self, timeout)
763 
764     def get(self, timeout=None):
--> 765         self.wait(timeout)
766         if not self.ready():
767             raise TimeoutError
~/miniconda3/lib/python3.8/multiprocessing/pool.py in wait(self, timeout)
760 
761     def wait(self, timeout=None):
--> 762         self._event.wait(timeout)
763 
764     def get(self, timeout=None):
~/miniconda3/lib/python3.8/threading.py in wait(self, timeout)
556             signaled = self._flag
557             if not signaled:
--> 558                 signaled = self._cond.wait(timeout)
559             return signaled
560 
~/miniconda3/lib/python3.8/threading.py in wait(self, timeout)
300         try:    # restore state no matter what (e.g., KeyboardInterrupt)
301             if timeout is None:
--> 302                 waiter.acquire()
303                 gotit = True
304             else:
KeyboardInterrupt: 

我是不是错过了什么?

更新:所以我实际上知道.close().join()的问题,我已经尝试了所有可能的组合,包括完全省略这两个函数,它不会改变任何东西。我正在寻找一个特定于Jupyter的解决方案。

我认为这是SO上已经报告的重复问题;我就是找不到。无论如何,解决方案是将函数test_async放在一个模块中,例如worker.py,然后:

import multiprocessing
from worker import test_async
# required for Windows:
if __name__ == '__main__':
with multiprocessing.Pool(processes=4) as pool:
result = pool.starmap_async(test_async, [(1, 2, 3, 4), (2, 3, 4, 5), (3, 4, 5, 6), (4, 5, 6, 7), (5, 6, 7, 8)])
result.get()
# pool.join() # cannot call this without first calling pool.close()

例如,如果worker.py与包含此单元格的笔记本文件位于同一目录中,则根据定义,worker.py将位于模块搜索路径上的目录中。

顺便说一下,你的代码相当于:

import multiprocessing
from worker import test_async
# required for Windows:
if __name__ == '__main__':
with multiprocessing.Pool(processes=4) as pool:
pool.starmap(test_async, [(1, 2, 3, 4), (2, 3, 4, 5), (3, 4, 5, 6), (4, 5, 6, 7), (5, 6, 7, 8)])

实际上不需要pool.close()pool.join(),因为您无论如何都在等待所有提交的任务完成。

是的,您缺少了一些东西。您的";用";区段必须在内

if __name__ == "__main__":

文档对此发出警告。请记住,当您执行多处理时,每个新进程都从头开始,并且必须重新加载主文件。除非您有__name__检查,否则它将启动一个无限循环,每个新进程将衍生出更多进程。

相关内容

  • 没有找到相关文章

最新更新