我目前正在尝试使用python多处理。我使用的库是multiprocess
(不是multiprocessing
(。
我有以下代码,它创建了许多计算作业,并通过映射操作运行它:
pool = multiprocess.Pool(4)
all_responses = pool.map_async(wrapper_singlerun, range(10000))
pool.join()
pool.close()
但是,每当我运行此代码片段时,都会收到以下错误:
pool.join()
File "/Users/davidal/miniconda3/lib/python3.6/site-packages/multiprocess/pool.py", line 509, in join
assert self._state in (CLOSE, TERMINATE)
AssertionError
你知道为什么会发生此错误吗?我以前用过pool.map_async
,但认为我需要有一个pool rendez-vous
命令。否则,我的电脑创造了类似叉子炸弹的东西,它创建了太多的线程(至少,我认为它是这样做的......
任何想法都值得赞赏!
问题是你在close
之前调用join
。
multiprocess
似乎缺少它的文档,但是,据我所知,它基本上是 stdlibmultiprocessing
的一个分支,猴子补丁dill
用于pickle
,所以multiprocessing
文档在这里应该是相关的。(另外,在评论中,您说您可以使用multiprocessing
重现该问题。
所以,Pool.join
说:
等待工作进程退出。在使用
join()
之前,必须调用close()
或terminate()
。
close
方法是关闭队列的发送端,以便无法添加新任务。join
方法是等待处理队列上的所有内容的方式。等待队列排空后再关闭它是行不通的。
但是你在join
之后打电话给close
,而不是之前。join
做的第一件事是assert
你已经调用了close
或terminate
,而你没有,因此断言失败。
因此,您可能只想切换这两个调用的顺序。
或者,或者,也许您对join
的用途感到困惑,并认为您需要先调用它,然后才能使用all_responses.get()
或.wait()
.如果是这样,则无需这样做;get
将阻止,直到结果可用,之后您不需要join
。这实际上更常见,尤其是对于map
和朋友(尽管文档中的示例是通过with Pool(…) as pool:
而不是手动调用池中的任何内容来实现的(。