我正在使用networkx
(网络分析库)进行一些缓慢的计算,并且我正在尝试使用Pool worker使其更快一些。计算是独立的,因此并行化它们应该相对简单。
def computeInfoPooled(G,num_list):
pool=Pool(processes=4)
def f(k):
curr_stat={}
curr_stat[k]=slow_function(k,G)
return curr_stat
result = pool.map(f,num_list)
return result
现在,我在控制台中运行以下命令:
computed_result=computeInfoPooled(G)
我希望这段代码创建4个进程,并在不同的进程中使用num_list的每个项(一个数字)调用f。如果num_list包含超过4个数字(在我的例子中大约是300),它将同时运行4个数字,并将其余的排队,直到其中一个合池工作完成。
当我运行我的代码时发生的事情是,许多python.exe正在生成(或分叉,不确定发生了什么),似乎它正在创建无限多个进程,所以我不得不拔掉我的机器。
你知道我做错了什么吗?我该怎么做?
在Windows上需要
if __name__ == '__main__':
computed_result = computeInfoPooled(G)
使您的脚本可导入,而无需启动fork炸弹。(请阅读文档中的"安全导入主模块"章节。
还要注意,在Windows上,您可能无法使用交互式解释器中的多处理模块。见文档顶部附近的警告:
这个包中的功能要求main模块是可由孩子们进口。这在编程指南中有介绍然而,这里值得指出。这意味着一些例子,如的多进程。池示例将无法在交互式解释器。(我强调。)
相反,将脚本保存到一个文件中,例如script.py
,并从命令行运行:
python script.py
此外,您需要pool.map
的参数是可pickle的。函数f
需要在模块级别定义(而不是在computeInfoPooled
中定义):
def f(k):
curr_stat = slow_function(k, G)
return k, curr_stat
def computeInfoPooled(G, num_list):
pool = Pool(processes=4)
result = pool.map(f, num_list)
return dict(result)
if __name__ == '__main__':
computed_result = computeInfoPooled(G)
顺便说一下,如果
f
返回一个字典,那么pool.map(f, ...)
将返回一个字典列表。我不确定这是您想要的,特别是因为每个字典只有一个键值对。
相反,如果您让f
返回一个(键,值)元组,那么pool.map(f, ...)
将返回一个元组列表,然后您可以使用dict(result)
将其转换为字典。
这在Windows编程指南的文档中有解释。
根据您的平台,每个进程可能必须启动一个全新的解释器和import
您的模块,以获得f
函数调用。(在Windows上,它总是必须这样做。)当import
是你的模块时,所有的顶级代码都将运行,其中包括computed_result=computeInfoPooled(G)
行,它创建了一个包含4个进程的全新池,等等。
解决这个问题的方法与处理任何其他情况相同,您希望同一文件既可以作为模块导入又可以作为脚本运行:
def computeInfoPooled(G,num_list):
pool=Pool(processes=4)
def f(k):
curr_stat={}
curr_stat[k]=slow_function(k,G)
return curr_stat
result = pool.map(f,num_list)
return result
if __name__ == '__main__':
computed_result=computeInfoPooled(G)
从你的编辑和你的评论中,你似乎期望从交互式解释器调用computeInfoPooled(G)
将解决这个问题。同样的链接文档部分详细解释了为什么这不起作用,并且在引言的最上方有一个大注释直接说:
包中的功能要求main模块可以被子模块导入。这在编程指南中有介绍,但在这里值得指出。这意味着一些例子,比如多处理。池示例不能在交互式解释器中工作。
如果你想理解为什么这是正确的,你需要阅读链接的文档(你还需要了解import
, pickle
和multiprocessing
是如何工作的)