这个问题最近被问了几次并解决了,但我有一个相当具体的例子......
我有一个多处理函数,昨天在完全隔离的情况下(在交互式笔记本中(工作得非常好,但是,我决定参数化,以便我可以将其作为更大管道的一部分调用,用于抽象/清洁笔记本,现在它只使用单个线程而不是 6。
import pandas as pd
import multiprocessing as mp
from multiprocessing import get_context
mp.set_start_method('forkserver')
def multiprocess_function(func, iterator, input_data):
result_list = []
def append_result(result):
result_list.append(result)
with get_context('fork').Pool(processes=6) as pool:
for i in iterator:
pool.apply_async(func, args = (i, input_data), callback = append_result)
pool.close()
pool.join()
return result_list
multiprocess_function(count_live, run_weeks, base_df)
我以前的代码版本以不同的方式执行,而不是返回/调用,我在函数底部使用以下代码(现在我已经参数化了,这根本不起作用 - 即使分配了参数(
if __name__ == '__main__':
multiprocess_function()
该函数执行良好,只是根据顶部的输出仅跨一个线程运行。
抱歉,如果这是非常简单的事情 - 我不是程序员,我是分析师:)
编辑:如果我在函数底部包含 =='main': 等if__name__并执行单元格,一切正常,但是,当我这样做时,我必须删除参数 - 也许只是与范围有关。如果我通过调用函数来执行,无论它是否参数化,它都只在单个线程上运行。
你有两个问题:
-
您没有使用导入防护。
-
您没有在导入保护中设置默认启动方法。
在他们两个之间,你最终告诉Python在叉子服务器中生成分叉服务器,这只会让你感到悲伤。将代码的结构更改为:
import pandas as pd
import multiprocessing as mp
from multiprocessing import get_context
def multiprocess_function(func, iterator, input_data):
result_list = []
with get_context('fork').Pool(processes=6) as pool:
for i in iterator:
pool.apply_async(func, args=(i, input_data), callback=result_list.append)
pool.close()
pool.join()
return result_list
if __name__ == '__main__':
mp.set_start_method('forkserver')
multiprocess_function(count_live, run_weeks, base_df)
由于您没有显示您从哪里获得count_live
、run_weeks
和base_df
,我只想说,对于编写的代码,它们应该在受保护的部分中定义(因为没有什么依赖于它们作为全局(。
还有其他改进要做(apply_async
的使用方式让我真的只是想列出pool.imap_unordered
的结果,没有显式循环(,但这正在解决会破坏使用spawn
或forkserver
启动方法的大问题。
使用"get_context('spawn'("而不是"get_context('fork'("也许可以解决你的问题