仅使用单个线程而不是多个线程进行多处理



这个问题最近被问了几次并解决了,但我有一个相当具体的例子......

我有一个多处理函数,昨天在完全隔离的情况下(在交互式笔记本中(工作得非常好,但是,我决定参数化,以便我可以将其作为更大管道的一部分调用,用于抽象/清洁笔记本,现在它只使用单个线程而不是 6。

import pandas as pd
import multiprocessing as mp
from multiprocessing import get_context
mp.set_start_method('forkserver')

def multiprocess_function(func, iterator, input_data):
result_list = []
def append_result(result):
result_list.append(result)
with get_context('fork').Pool(processes=6) as pool:
for i in iterator:
pool.apply_async(func, args = (i, input_data), callback = append_result)
pool.close()
pool.join()
return result_list
multiprocess_function(count_live, run_weeks, base_df)

我以前的代码版本以不同的方式执行,而不是返回/调用,我在函数底部使用以下代码(现在我已经参数化了,这根本不起作用 - 即使分配了参数(

if __name__ == '__main__':
multiprocess_function()

该函数执行良好,只是根据顶部的输出仅跨一个线程运行。

抱歉,如果这是非常简单的事情 - 我不是程序员,我是分析师:)

编辑:如果我在函数底部包含 =='main': 等if__name__并执行单元格,一切正常,但是,当我这样做时,我必须删除参数 - 也许只是与范围有关。如果我通过调用函数来执行,无论它是否参数化,它都只在单个线程上运行。

你有两个问题:

  1. 您没有使用导入防护。

  2. 您没有在导入保护中设置默认启动方法。

在他们两个之间,你最终告诉Python在叉子服务器中生成分叉服务器,这只会让你感到悲伤。将代码的结构更改为:

import pandas as pd
import multiprocessing as mp
from multiprocessing import get_context

def multiprocess_function(func, iterator, input_data):
result_list = []
with get_context('fork').Pool(processes=6) as pool:
for i in iterator:
pool.apply_async(func, args=(i, input_data), callback=result_list.append)
pool.close()
pool.join()
return result_list
if __name__ == '__main__':
mp.set_start_method('forkserver')
multiprocess_function(count_live, run_weeks, base_df)

由于您没有显示您从哪里获得count_liverun_weeksbase_df,我只想说,对于编写的代码,它们应该在受保护的部分中定义(因为没有什么依赖于它们作为全局(。

还有其他改进要做(apply_async的使用方式让我真的只是想列出pool.imap_unordered的结果,没有显式循环(,但这正在解决会破坏使用spawnforkserver启动方法的大问题。

使用"get_context('spawn'("而不是"get_context('fork'("也许可以解决你的问题

相关内容

  • 没有找到相关文章

最新更新