Python的多处理似乎几乎不可能在类中/使用任何类实例来完成.它的预期用途是什么?



我有一个算法,我试图并行化,因为串行运行时间很长。然而,需要并行化的函数是在类中。multiprocessing.Pool似乎是最好和最快的方式来做到这一点,但有一个问题。它的目标函数不能是对象实例的函数。意义;您可以用以下方式声明Pool:

import multiprocessing as mp
cpus = mp.cpu_count()
poolCount = cpus*2
pool = mp.Pool(processes = poolCount, maxtasksperchild = 2)

然后实际使用:

pool.map(self.TargetFunction, args)

但是这会抛出一个错误,因为对象实例不能被pickle,因为Pool函数会将信息传递给它的所有子进程。但我使用self.TargetFunction

所以我有了一个主意,我将创建一个名为parallel的新Python文件,并简单地编写几个函数,而不将它们放在类中,并从我的原始类(我想并行化的函数)中调用这些函数

所以我试了这个:

import multiprocessing as mp
def MatrixHelper(args):
    WM = args[0][0]
    print(WM.CreateMatrixMp(*args))
    return WM.CreateMatrixMp(*args)
def Start(sigmaI, sigmaX, numPixels, WM):
    cpus = mp.cpu_count()
    poolCount = cpus * 2
    args = [(WM, sigmaI, sigmaX, i) for i in range(numPixels)]
    print('Number of cpu's to process WM:%d'%cpus)
    pool = mp.Pool(processes = poolCount, maxtasksperchild = 2)
    tempData = pool.map(MatrixHelper, args)
    return tempData

这些函数不是类的一部分,在Poolmap函数中使用MatrixHelper可以正常工作。但当我这样做的时候,我意识到这是没有出路的。需要并行化的函数(CreateMatrixMp)需要一个对象传递给它(它被声明为def CreateMatrixMp(self, sigmaI, sigmaX, i))

因为它不是在它的类中被调用的,所以它没有得到一个传递给它的self。为了解决这个问题,我将Start函数传递给调用对象本身。比如,我说parallel.Start(sigmaI, sigmaX, self.numPixels, self)。对象self然后变成WM,这样我将能够最终调用所需的函数WM.CreateMatrixMp()

我确信这是一种非常草率的编码方式,但我只是想看看它是否有效。但是不,再次出现pickle错误,map函数根本不能处理任何对象实例。

所以我的问题是,为什么它是这样设计的?它似乎毫无用处,在任何使用类的程序中似乎都完全不起作用。

我尝试使用Process而不是Pool,但这需要我最终写入的数组被共享,这需要进程相互等待。如果我不想让它被共享,那么我让每个进程写自己的小数组,并在最后做一个大的写操作。但是这两个都导致比串行执行时运行时间慢!python内置的multiprocessing似乎完全没用!

有人能给我一些指导,如何实际节省时间与多处理,在我的目标函数是在一个类内的上下文中?我在这里读到过使用pathos.multiprocessing的帖子,但我是在Windows上,我和很多人一起做这个项目,他们都有不同的设置。让每个人都试着安装它会很不方便。

我在尝试在类中使用多处理时遇到了类似的问题。我在网上找到了一个相对简单的解决方法。基本上,您使用类外部的函数来解包裹/解包您试图并行化的函数内部的方法。以下是我找到的两个网站,它们解释了如何做到这一点。

网站1 (joblib示例)

网站2(多处理模块示例)

对于两者,思路是这样做:

rom multiprocessing import Pool
import time
 
def unwrap_self_f(arg, **kwarg):
    return C.f(*arg, **kwarg)
 
class C:
    def f(self, name):
        print 'hello %s,'%name
        time.sleep(5)
        print 'nice to meet you.'
     
    def run(self):
        pool = Pool(processes=2)
        names = ('frank', 'justin', 'osi', 'thomas')
        pool.map(unwrap_self_f, zip([self]*len(names), names))
 
if __name__ == '__main__':
    c = C()
    c.run()

multiprocessing工作的本质是它产生接收参数以运行某个函数的子进程。为了传递这些参数,它需要它们是可传递的:非独占于主进程,例如套接字,文件描述符和其他低级的,与操作系统相关的东西。

这翻译成"需要能够pickle可序列化"。

在同一主题上,当你(可以)对一个问题有独立的划分时,并行处理效果最好。我可以告诉你,你想要共享某种输入/流/数据库源,但这可能会造成一个瓶颈,你必须在某个时候解决(至少,从"python脚本"方面,而不是"操作系统/数据库"方面)。幸运的是,你现在必须尽早解决它。

你可以重新编码你的类,在需要的时候生成/创建这些不可选择的资源,而不是在开始

def targetFunction(self, range_params):
  if not self.ready():
    self._init_source()
  #rest of the code

你用另一种方式解决了这个问题(基于参数初始化对象)。是的,并行处理是有代价的。

您可以查看multiprocessing编程指南,以获得关于此问题的更全面的见解。

这是一篇老文章,但当你搜索这个主题时,它仍然是最重要的结果之一。关于这个问题的一些有用信息可以在这个堆栈溢出中找到:python子类化multiprocessing。流程

我尝试了一些变通方法,尝试从类内部调用pool.starmap到类中的另一个函数。使其成为静态方法或在外部调用函数都不起作用,并给出相同的错误。类实例不能被pickle,所以我们需要在启动multiprocessing后创建实例。

我最终做的是把我的类分成两个类。基本上,你正在调用multiprocessing的函数需要在你为它所属的类实例化一个新对象之后被调用。

像这样:

from multiprocessing import Pool
class B:
    ...
    def process_feature(idx, feature):
        # do stuff in the new process
        pass
    ...
def multiprocess_feature(process_args):
    b_instance = B()
    return b_instance.process_feature(*process_args)
class A:
    ...
    def process_stuff():
        ...
        with Pool(processes=num_processes, maxtasksperchild=10) as pool:
            results = pool.starmap(
                multiprocess_feature,
                [
                    (idx, feature)
                    for idx, feature in enumerate(features)
                ],
                chunksize=100,
            )
        ...
    ...
...

相关内容

  • 没有找到相关文章

最新更新