在类方法Python中调用多处理



最初,我有一个类来存储一些处理后的值,并在其他方法中重用这些值。

问题是当我尝试将类方法划分为多个进程以加快速度时,python 生成了进程,但它似乎不起作用(正如我在任务管理器中看到的那样,只有 1 个进程在运行)并且结果永远不会交付。

我做了几次搜索,发现pathos.multiprocessing可以做到这一点,但我想知道标准库是否可以解决这个问题?

from multiprocessing import Pool
class A():
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = Pool(processes=4)
rs = t.map(self.cal, dt)
t.close()
return t
a = A(2)
a.run(list(range(10)))

您的代码失败,因为它无法pickle实例方法 (self.cal),这是 Python 在通过将多个进程映射到multiprocessing.Pool来生成多个进程时尝试执行的操作(嗯,有一种方法可以做到这一点,但它太复杂了,无论如何都不是很有用) - 由于没有共享内存访问,它必须"打包"数据并将其发送到生成的进程进行解包。如果您尝试腌制a实例,也会发生同样的情况。

multiprocessing包中唯一可用的共享内存访问鲜为人知multiprocessing.pool.ThreadPool因此,如果您真的想这样做:

from multiprocessing.pool import ThreadPool
class A():
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = ThreadPool(processes=4)
rs = t.map(self.cal, dt)
t.close()
return rs
a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

但这不会给你并行化,因为它本质上映射到你的常规线程,这些线程可以访问共享内存。您应该传递类/静态方法(如果您需要调用它们)以及您希望它们使用的数据(在您的情况下self.vl)。如果您需要跨进程共享该数据,则必须使用一些共享内存抽象,例如multiprocessing.Value,当然会在此过程中应用互斥锁。

更新

我说你可以做到(有些模块或多或少都在做,例如检查pathos.multiprocessing),但我认为这不值得麻烦 - 当你到了必须欺骗你的系统做你想做的事的地步时,很可能你要么使用了错误的系统,要么你应该重新考虑你的设计。但为了提供信息,这里有一种方法可以在多处理设置中执行您想要的操作:

import sys
from multiprocessing import Pool
def parallel_call(params):  # a helper for calling 'remote' instances
cls = getattr(sys.modules[__name__], params[0])  # get our class type
instance = cls.__new__(cls)  # create a new instance without invoking __init__
instance.__dict__ = params[1]  # apply the passed state to the new instance
method = getattr(instance, params[2])  # get the requested method
args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]]
return method(*args)  # expand arguments, call our method and return the result
class A(object):
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = Pool(processes=4)
rs = t.map(parallel_call, self.prepare_call("cal", dt))
t.close()
return rs
def prepare_call(self, name, args):  # creates a 'remote call' package for each argument
for arg in args:
yield [self.__class__.__name__, self.__dict__, name, arg]
if __name__ == "__main__":  # important protection for cross-platform use
a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

我认为它的工作原理是不言自明的,但简而言之,它将类的名称、它的当前状态(无信号、tho)、要调用的所需方法以及调用它的参数传递给parallel_call函数,该函数是为Pool中的每个进程调用的。Python 会自动腌制和取消所有这些数据,因此parallel_call需要做的就是重建原始对象,在其中找到所需的方法并使用提供的参数调用它。

这样,我们只传递数据而不尝试传递活动对象,这样 Python 就不会抱怨(好吧,在这种情况下,尝试向类参数添加对实例方法的引用,看看会发生什么),一切正常。

如果你想重重"魔力",你可以让它看起来和你的代码一模一样(创建你自己的Pool处理程序,从函数中获取名称并将名称发送到实际进程等),但这应该为你的例子提供足够的功能。

但是,在提高希望之前,请记住,这仅在共享"静态"实例(在多处理上下文中开始调用它后不会更改其初始状态的实例)时才有效。如果A.cal方法是更改vl属性的内部状态 - 它将仅影响它更改的实例(除非它在调用之间调用Pool的主实例中更改)。如果还想共享状态,可以升级parallel_call以在调用后拾取instance.__dict__并将其与方法调用结果一起返回,然后在调用端,您必须使用返回的数据更新本地__dict__以更改原始状态。这还不够 - 您实际上必须创建一个共享字典并处理所有互斥体员工,以使所有进程同时访问它(您可以使用multiprocessing.Manager)。

所以,正如我所说,麻烦多于它的价值......

问题:它似乎不起作用(正如我在任务管理器中看到的那样,只有 1 个进程在运行) 结果永远不会交付。

只看到 1 个进程Pool计算使用的进程数,如下所示:
您给出range(10)= 任务索引 0..9,因此Pool计算(10 / 4) * 4 = 8+1 = 9
启动第一个process后,没有更多的任务了。
使用range(32),您将看到4process正在运行。

您返回的是return t,而不是返回rs = pool.map(...的结果。


例如,这将起作用

def cal(self, nb):
import os
print('pid:{} cal({})'.format(os.getpid(), nb))
return nb * self.vl
def run(self,df):
with mp.Pool(processes=4) as pool:
return pool.map(self.cal, df)
if __name__ == '__main__':
a = A(2)
result = a.run(list(range(32)))
print(result)

用 Python 测试:3.4.2

相关内容

  • 没有找到相关文章

最新更新