多进程只读共享内存



我已经找到了这个问题的一些答案,但是我仍然对整个多处理感到困惑。我试着并行化我的程序。为了简化它,我有两个类ProblemSubProblem。类Problem对类SubProblem的6个实例调用方法solve_,目前是串行求解。我相信并行解决这些问题会有所收获。

class Problem():
def __init__(self, data):
self.data = data
self.sub_pbs = {i: SubProblem(data) for i in range(range(6)}
def run(self):
dic_ = self.do_some_analysis()  # does some analysis with self.data
result = []
for k, sp in self.sub_pbs.items():
result.append(sp.solve_(dic_, k))
return result

SubProblem类如下:

class SubProblem:
def __init__(self,data):
self.data= self.retrieve_interesting_data(data)
def solve_(self, dic_ k):
solutions = []
# do some stuff and call many other functions and store the result in solutions 
return solutions 

我试图并行化我的代码(run函数在Problem类)的方式如下:

import concurrent.futures
def run(self):
dic_ = self.do_some_analysis()  # does some analysis with self.data
res = []
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(sp.solve_,dic_, k) for k, sp in self.sub_pbs.items()]
res= [f.result for f in results]
return res

实际代码要复杂得多。用这种方法并行化之后,它比串行化要慢。我运行分析器,发现_thread的acquire()方法。锁对象占用了大量时间。我认为这可能是因为访问子问题/进程之间共享的数据。

为了运行solve_,子问题需要两种类型的数据:所有子问题都应该访问它的一些数据(一种全局数据,是子问题属性的一部分,但也作为solve_函数的参数传递),以及特定于每个子问题的其他一些数据,是子问题属性的一部分,也作为参数传递给solve函数。然而,所有这些数据将不会在任何子问题/进程中被修改。

现在我的问题是,我应该如何改变我的代码,以便需要被所有子问题/进程访问的数据不被每个进程复制?是否有任何关于如何有效地将这些数据传递给流程的提示?

使用线程(共享内存)可能比使用单独的进程更好,特别是在不修改共享数据的情况下。

可能就像把'ProcessPoolExecutor'改成'ThreadPoolExecutor'一样简单

这真的取决于你在问题和子问题中的分析是怎么做的。如果它使用numpy或pandas,它们会释放python GIL,可以从线程中获益,或者如果它主要等待I/o,

首先,您当前的SubProblem.solve方法只显示参数k(除了self);传递给它的dic_值没有参数

Multiprocessing有直接的、非多处理没有的开销,即创建进程的开销和将值从一个地址空间移动到另一个地址空间的开销。这一成本是值得的,但前提是"工人"。被调用的函数(在本例中是各种sp.solve方法)都是cpu密集型的,因此并行运行它们所获得的收益超过了前面提到的成本。因此,您的特定方法可能不适合多处理。

处理dic_最有效的方法是根本不复制它,也就是说,在共享内存中创建它。但要做到这一点,我需要更多地了解数据的结构而这种结构不能是任意的;只支持相当有限的共享内存类型,比如Array类型。也有"管理";类型(参见调用multiprocessing.Manager()返回的multiprocessing.managers.SyncManager)。但是,访问这些托管类型的成本可能很高。

但是你可以做一些事情来减少开销。首先,不要创建超出所需的池。您将提交6个任务,但是您的代码将根据您拥有的CPU内核数量创建一个默认池大小的池。如果您有12个内核,那么您将创建6个未使用的进程。

让我们考虑相反的问题。让我们假设您只有4个核心,因此您将创建一个包含4个进程的池。您将调用submit并传递dic_6次。如果dic_是一小部分数据,则确实不需要对其进行优化。但是,如果dic_从一个地址空间移动到另一个地址空间的成本很高,那么最好将此数据从主进程复制一次到池中的每个进程,并将其作为全局数据存储在每个进程的地址空间中。因此,您将保存该数据的2次移动。对于这个特定的程序来说,这并不是一个很大的节省,但是在您的工具箱中拥有它是一个非常宝贵的技术。

import concurrent.futures
from os import cpu_count
def init_pool(dic):
global dic_
dic_ = dic
class Problem():
def __init__(self, data):
self.data = data
self.sub_pbs = {i: SubProblem(data) for i in range(range(6))}

def run(self):
dic_ = self.do_some_analysis()  # does some analysis with self.data
# Don't create a pool larger than what you need:
pool_size = min(len(self.sub_pbs.items()), cpu_count())
with concurrent.futures.ProcessPoolExecutor(max_workers=pool_size, initializer=init_pool, initargs=(dic_,)) as executor:
results = [executor.submit(sp.solve_, k) for k, sp in self.sub_pbs.items()]
res= [f.result() for f in results]
return res
class SubProblem:
def __init__(self, data):
self.data = self.retrieve_interesting_data(data)
def solve_(self, k):
# dic_ is now global data (presumably read/only)
global dic_
solutions = []
# do some stuff and call many other functions and store the result in solutions 
return solutions

相关内容

  • 没有找到相关文章

最新更新