多处理管理器不能传递给另一个进程



我需要将Manager实例传递给其他进程,因为我需要并行创建的代理对象实例,然后在单独的进程中再次重用。然而,似乎我不能传递一个Manager作为一个函数的参数,应该由另一个进程运行。参见示例:

from multiprocessing.managers import BaseManager
from multiprocessing import Pool
from functools import partial

class MyManager(BaseManager):
pass

class MyClass():
def __init__(self, i):
self.i = i

def my_fun(i, manager):
return manager.MyClass(i)

MyManager.register('MyClass', MyClass)
manager = MyManager()
manager.start()
f = partial(my_fun, manager=manager)
with Pool(4) as p:
res = [r.i for r in p.map(f, list(range(10)))]
print(res)

如果我运行上面的代码,将出现以下异常:

TypeError: Pickling an AuthenticationString object is disallowed for security reasons

有趣的是,在Pool.Processargs参数中传递Manager是有效的,但我仍然需要map的功能。

首先,为您的类自动生成的代理不支持访问属性。因此,如果希望访问托管类的i属性,则需要显式定义自己的代理类。例如,只定义一个方法get_i来返回该属性会更容易。我通常会在原始类的子类中定义get_i方法,这只是为了用作托管类实现而创建的。在下面的代码中,我定义了这样一个方法(尽管我没有费心创建一个特殊的子类)一个自定义代理类来向您展示如何做到这一点。

我只是没有办法将manager实例传递给另一个进程。我提出的解决方案(可能有更好的)是创建一个线程,它将通过multiprocessing.Pipe实例公开的连接接受请求。您将需要强制这些请求的单线程,不仅因为您不能让多个进程并发地向同一个连接发送,而且因为这是确保请求者返回的响应与其请求匹配的唯一方法。

这个想法是my_fun函数通过它的连接发送参数i,它想为它创建一个MyClass实例。运行在主进程中的守护线程create_MyClass函数(manager为其定义)接收这个参数,创建所需的类实例并将结果发送回来。本质上,create_MyClass的行为就像一个工厂方法。这种"方法"的方式 ,即通过管道创建的连接向运行在不同进程中的线程发送消息,实际上类似于对托管类的代理引用进行方法调用。
from multiprocessing.managers import BaseManager, NamespaceProxy
from multiprocessing import Pool, Pipe, Lock
from threading import Thread
class MyManager(BaseManager):
pass
class MyClass():
def __init__(self, i):
self.i = i
def get_i(self):
return self.i
class MyClassProxy(NamespaceProxy):
_exposed_ = ('__getattribute__', '__setattr__', '__delattr__', 'get_i')
def get_i(self):
return self._callmethod('get_i')

def init_pool(the_connection, the_lock):
global connection, lock
connection = the_connection
lock = the_lock
def my_fun(i):
with lock:
connection.send(i) # send argument
my_class = connection.recv() # get result
return my_class
def create_MyClass(connection):
while True:
i = connection.recv()
my_class = manager.MyClass(i)
connection.send(my_class)
if __name__ == '__main__':
MyManager.register('MyClass', MyClass, MyClassProxy)
manager = MyManager()
manager.start()
lock = Lock()
connection1, connection2 = Pipe(duplex=True)
# Give one of the bi-directional connections to the daemon thread:
Thread(target=create_MyClass, args=(connection1,), daemon=True).start()
# Initialize each process in the pool with the other bi-directional connection
# and a lock to ensure single-threading of the requests:
with Pool(4, initializer=init_pool, initargs=(connection2, lock)) as p:
res = [r.i for r in p.map(my_fun, list(range(10)))]
print(res)

打印:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

相关内容

最新更新