我有一个算法,我试图并行化,因为串行运行时间很长。然而,需要并行化的函数是在类中。multiprocessing.Pool
似乎是最好和最快的方式来做到这一点,但有一个问题。它的目标函数不能是对象实例的函数。意义;您可以用以下方式声明Pool
:
import multiprocessing as mp
cpus = mp.cpu_count()
poolCount = cpus*2
pool = mp.Pool(processes = poolCount, maxtasksperchild = 2)
然后实际使用:
pool.map(self.TargetFunction, args)
但是这会抛出一个错误,因为对象实例不能被pickle,因为Pool
函数会将信息传递给它的所有子进程。但我有使用self.TargetFunction
所以我有了一个主意,我将创建一个名为parallel
的新Python文件,并简单地编写几个函数,而不将它们放在类中,并从我的原始类(我想并行化的函数)中调用这些函数
所以我试了这个:
import multiprocessing as mp
def MatrixHelper(args):
WM = args[0][0]
print(WM.CreateMatrixMp(*args))
return WM.CreateMatrixMp(*args)
def Start(sigmaI, sigmaX, numPixels, WM):
cpus = mp.cpu_count()
poolCount = cpus * 2
args = [(WM, sigmaI, sigmaX, i) for i in range(numPixels)]
print('Number of cpu's to process WM:%d'%cpus)
pool = mp.Pool(processes = poolCount, maxtasksperchild = 2)
tempData = pool.map(MatrixHelper, args)
return tempData
这些函数不是类的一部分,在Pool
和map
函数中使用MatrixHelper
可以正常工作。但当我这样做的时候,我意识到这是没有出路的。需要并行化的函数(CreateMatrixMp
)需要一个对象传递给它(它被声明为def CreateMatrixMp(self, sigmaI, sigmaX, i)
)
因为它不是在它的类中被调用的,所以它没有得到一个传递给它的self
。为了解决这个问题,我将Start
函数传递给调用对象本身。比如,我说parallel.Start(sigmaI, sigmaX, self.numPixels, self)
。对象self
然后变成WM
,这样我将能够最终调用所需的函数WM.CreateMatrixMp()
。
我确信这是一种非常草率的编码方式,但我只是想看看它是否有效。但是不,再次出现pickle错误,map
函数根本不能处理任何对象实例。
所以我的问题是,为什么它是这样设计的?它似乎毫无用处,在任何使用类的程序中似乎都完全不起作用。
我尝试使用Process
而不是Pool
,但这需要我最终写入的数组被共享,这需要进程相互等待。如果我不想让它被共享,那么我让每个进程写自己的小数组,并在最后做一个大的写操作。但是这两个都导致比串行执行时运行时间慢!python内置的multiprocessing
似乎完全没用!
有人能给我一些指导,如何实际节省时间与多处理,在我的目标函数是在一个类内的上下文中?我在这里读到过使用pathos.multiprocessing
的帖子,但我是在Windows上,我和很多人一起做这个项目,他们都有不同的设置。让每个人都试着安装它会很不方便。
我在尝试在类中使用多处理时遇到了类似的问题。我在网上找到了一个相对简单的解决方法。基本上,您使用类外部的函数来解包裹/解包您试图并行化的函数内部的方法。以下是我找到的两个网站,它们解释了如何做到这一点。
网站1 (joblib示例)
网站2(多处理模块示例)
对于两者,思路是这样做:
rom multiprocessing import Pool
import time
def unwrap_self_f(arg, **kwarg):
return C.f(*arg, **kwarg)
class C:
def f(self, name):
print 'hello %s,'%name
time.sleep(5)
print 'nice to meet you.'
def run(self):
pool = Pool(processes=2)
names = ('frank', 'justin', 'osi', 'thomas')
pool.map(unwrap_self_f, zip([self]*len(names), names))
if __name__ == '__main__':
c = C()
c.run()
multiprocessing
工作的本质是它产生接收参数以运行某个函数的子进程。为了传递这些参数,它需要它们是可传递的:非独占于主进程,例如套接字,文件描述符和其他低级的,与操作系统相关的东西。
这翻译成"需要能够pickle
或可序列化"。
在同一主题上,当你(可以)对一个问题有独立的划分时,并行处理效果最好。我可以告诉你,你想要共享某种输入/流/数据库源,但这可能会造成一个瓶颈,你必须在某个时候解决(至少,从"python脚本"方面,而不是"操作系统/数据库"方面)。幸运的是,你现在必须尽早解决它。
你可以重新编码你的类,在需要的时候生成/创建这些不可选择的资源,而不是在开始
def targetFunction(self, range_params):
if not self.ready():
self._init_source()
#rest of the code
你用另一种方式解决了这个问题(基于参数初始化对象)。是的,并行处理是有代价的。
您可以查看multiprocessing
编程指南,以获得关于此问题的更全面的见解。
这是一篇老文章,但当你搜索这个主题时,它仍然是最重要的结果之一。关于这个问题的一些有用信息可以在这个堆栈溢出中找到:python子类化multiprocessing。流程
我尝试了一些变通方法,尝试从类内部调用pool.starmap
到类中的另一个函数。使其成为静态方法或在外部调用函数都不起作用,并给出相同的错误。类实例不能被pickle,所以我们需要在启动multiprocessing后创建实例。
我最终做的是把我的类分成两个类。基本上,你正在调用multiprocessing的函数需要在你为它所属的类实例化一个新对象之后被调用。
像这样:
from multiprocessing import Pool
class B:
...
def process_feature(idx, feature):
# do stuff in the new process
pass
...
def multiprocess_feature(process_args):
b_instance = B()
return b_instance.process_feature(*process_args)
class A:
...
def process_stuff():
...
with Pool(processes=num_processes, maxtasksperchild=10) as pool:
results = pool.starmap(
multiprocess_feature,
[
(idx, feature)
for idx, feature in enumerate(features)
],
chunksize=100,
)
...
...
...