我正在尝试starmap_async
方法来测试具有多个参数的多处理,我的Jupyter笔记本中确实运行了以下内容:
import multiprocessing
def test_async(i, j, k, l):
print(i, j, k, l)
with multiprocessing.Pool(processes=4) as pool:
result = pool.starmap_async(test_async, [(1, 2, 3, 4), (2, 3, 4, 5), (3, 4, 5, 6), (4, 5, 6, 7), (5, 6, 7, 8)])
result.get()
这个细胞永远不会终止,它总是处于*状态。当我强制终止它时,我得到错误
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
<ipython-input-102-70185b68aa7c> in <module>
4 with multiprocessing.Pool(processes=4) as pool:
5 result = pool.starmap_async(test_async, [(1, 2, 3, 4), (2, 3, 4, 5), (3, 4, 5, 6), (4, 5, 6, 7)])
----> 6 result.get()
7 pool.join()
~/miniconda3/lib/python3.8/multiprocessing/pool.py in get(self, timeout)
763
764 def get(self, timeout=None):
--> 765 self.wait(timeout)
766 if not self.ready():
767 raise TimeoutError
~/miniconda3/lib/python3.8/multiprocessing/pool.py in wait(self, timeout)
760
761 def wait(self, timeout=None):
--> 762 self._event.wait(timeout)
763
764 def get(self, timeout=None):
~/miniconda3/lib/python3.8/threading.py in wait(self, timeout)
556 signaled = self._flag
557 if not signaled:
--> 558 signaled = self._cond.wait(timeout)
559 return signaled
560
~/miniconda3/lib/python3.8/threading.py in wait(self, timeout)
300 try: # restore state no matter what (e.g., KeyboardInterrupt)
301 if timeout is None:
--> 302 waiter.acquire()
303 gotit = True
304 else:
KeyboardInterrupt:
我是不是错过了什么?
更新:所以我实际上知道.close()
和.join()
的问题,我已经尝试了所有可能的组合,包括完全省略这两个函数,它不会改变任何东西。我正在寻找一个特定于Jupyter的解决方案。
我认为这是SO上已经报告的重复问题;我就是找不到。无论如何,解决方案是将函数test_async
放在一个模块中,例如worker.py,然后:
import multiprocessing
from worker import test_async
# required for Windows:
if __name__ == '__main__':
with multiprocessing.Pool(processes=4) as pool:
result = pool.starmap_async(test_async, [(1, 2, 3, 4), (2, 3, 4, 5), (3, 4, 5, 6), (4, 5, 6, 7), (5, 6, 7, 8)])
result.get()
# pool.join() # cannot call this without first calling pool.close()
例如,如果worker.py与包含此单元格的笔记本文件位于同一目录中,则根据定义,worker.py
将位于模块搜索路径上的目录中。
顺便说一下,你的代码相当于:
import multiprocessing
from worker import test_async
# required for Windows:
if __name__ == '__main__':
with multiprocessing.Pool(processes=4) as pool:
pool.starmap(test_async, [(1, 2, 3, 4), (2, 3, 4, 5), (3, 4, 5, 6), (4, 5, 6, 7), (5, 6, 7, 8)])
实际上不需要pool.close()
和pool.join()
,因为您无论如何都在等待所有提交的任务完成。
是的,您缺少了一些东西。您的";用";区段必须在内
if __name__ == "__main__":
文档对此发出警告。请记住,当您执行多处理时,每个新进程都从头开始,并且必须重新加载主文件。除非您有__name__
检查,否则它将启动一个无限循环,每个新进程将衍生出更多进程。