多处理管理器和自定义类



我不知道为什么,但每当我试图传递给共享对象共享自定义类对象的方法时,我都会遇到这个奇怪的错误。Python版本:3.6.3

代码:

from multiprocessing.managers import SyncManager
class MyManager(SyncManager): pass
class MyClass: pass
class Wrapper:
def set(self, ent):
self.ent = ent
MyManager.register('MyClass', MyClass)
MyManager.register('Wrapper', Wrapper)
if __name__ == '__main__':
manager = MyManager()
manager.start()
try:
obj = manager.MyClass()
lst = manager.list([1,2,3])
collection = manager.Wrapper()
collection.set(lst) # executed fine
collection.set(obj) # raises error
except Exception as e:
raise

错误:

---------------------------------------------------------------------------
Traceback (most recent call last):
File "D:Program FilesPython363libmultiprocessingmanagers.py", line 228, in serve_client
request = recv()
File "D:Program FilesPython363libmultiprocessingconnection.py", line 251, in recv
return _ForkingPickler.loads(buf.getbuffer())
File "D:Program FilesPython363libmultiprocessingmanagers.py", line 881, in RebuildProxy
return func(token, serializer, incref=incref, **kwds)
TypeError: AutoProxy() got an unexpected keyword argument 'manager_owned'
---------------------------------------------------------------------------

这里出了什么问题?

我也遇到了这个问题,如前所述,这是Pythonmultiprocessing中的一个错误(请参阅问题#30256),纠正这个问题的pull请求尚未合并。此后,拉取请求被另一个PR取代,该PR进行了相同的更改,但也添加了一个测试。

除了手动修补本地安装之外,您还有三个其他选项:

  • 您可以使用可调用的MakeProxyType()来指定您的代理类型,而不依赖于AutoProxy代理生成器
  • 您可以定义一个自定义代理类
  • 你可以用猴痘来修补这个错误

在解释了AutoProxy的作用后,我将在下面描述这些选项:

AutoProxy类有什么意义

多处理Manager模式通过将所有值放在同一个专用的"规范值服务器"进程中来访问共享值。所有其他进程(客户端)通过代理与服务器通信,然后代理与服务器来回传递消息。

然而,服务器确实需要知道什么方法对于对象类型是可接受的,这样客户端就可以使用相同的方法生成代理对象。这就是AutoProxy对象的用途。每当客户端需要注册类的新实例时,客户端创建的默认代理是AutoProxy,然后它会要求服务器告诉它可以使用什么方法。

一旦它有了方法名称,它就会调用MakeProxyType来构造一个新类,然后为该类创建一个要返回的实例。

所有这些都会推迟到您实际需要代理类型的实例时进行,因此原则上,如果您没有使用已注册的某些类,AutoProxy会节省一点内存。然而,它的内存非常少,缺点是这个过程必须在每个客户端过程中进行。

这些代理对象使用引用计数来跟踪服务器何时可以删除规范值。它是在AutoProxy中被破坏的部分可调用;当在服务器进程中而不是在客户端中创建代理对象时,会向代理类型传递一个新的参数以禁用引用计数,但AutoProxy类型没有更新以支持这一点。

那么,你该怎么解决这个问题呢?以下是这3个选项:

使用MakeProxyType()可调用

如前所述,AutoProxy实际上只是一个(通过服务器)获取该类型的公共方法的调用,以及对MakeProxyType()的调用。您可以在注册时自己拨打这些电话。

所以,不是

from multiprocessing.managers import SyncManager
SyncManager.register("YourType", YourType)

使用

from multiprocessing.managers import SyncManager, MakeProxyType, public_methods
#               arguments:    classname,  sequence of method names
YourTypeProxy = MakeProxyType("YourType", public_methods(YourType))
SyncManager.register("YourType", YourType, YourTypeProxy)

请随时在那里内联MakeProxyType()调用。

如果对SyncManager.register()使用exposed参数,则应该将这些名称传递给MakeProxyType

# SyncManager.register("YourType", YourType, exposed=("foo", "bar"))
# becomes
YourTypeProxy = MakeProxyType("YourType", ("foo", "bar"))
SyncManager.register("YourType", YourType, YourTypeProxy)

对于所有预先注册的类型,你也必须这样做:

from multiprocessing.managers import SyncManager, AutoProxy, MakeProxyType, public_methods
registry = SyncManager._registry
for typeid, (callable, exposed, method_to_typeid, proxytype) in registry.items():
if proxytype is not AutoProxy:
continue
create_method = hasattr(managers.SyncManager, typeid)
if exposed is None:
exposed = public_methods(callable) 
SyncManager.register(
typeid,
callable=callable,
exposed=exposed,
method_to_typeid=method_to_typeid,
proxytype=MakeProxyType(f"{typeid}Proxy", exposed),
create_method=create_method,
)

创建自定义代理

您可以而不是依赖多处理为您创建代理。你可以自己写。除了特殊的"托管值"服务器进程的之外,所有进程都使用代理,代理应该来回传递消息。当然,这不是已经注册的类型的选项,但我在这里提到它,因为对于您自己的类型,这提供了优化的机会。

请注意,对于所有需要返回到"规范"值实例的交互,您应该有方法,因此您需要使用属性来处理普通属性,或者根据需要添加__getattr____setattr____delattr__方法。

其优点是,您可以非常细粒度地控制哪些方法实际需要与服务器进程交换数据;在我的特定示例中,我的代理类缓存的信息是不可变的(一旦创建对象,值就永远不会改变),但经常使用。这包括一个标志值,用于控制其他方法是否会执行某些操作,因此代理可以只检查标志值,如果未设置,则不会与服务器进程对话。类似这样的东西:

class FooProxy(BaseProxy):
# what methods the proxy is allowed to access through calls
_exposed_ = ("__getattribute__", "expensive_method", "spam")
@property
def flag(self):
try:
v = self._flag
except AttributeError:
# ask for the value from the server, "realvalue.flag"
# use __getattribute__ because it's an attribute, not a property
v = self._flag = self._callmethod("__getattribute__", ("flag",))
return flag
def expensive_method(self, *args, **kwargs):
if self.flag:   # cached locally!
return self._callmethod("expensive_method", args, kwargs)
def spam(self, *args, **kwargs):
return self._callmethod("spam", args, kwargs)
SyncManager.register("Foo", Foo, FooProxy)

因为MakeProxyType()返回一个BaseProxy子类,所以您可以将该类与一个自定义子类组合,从而省去编写任何仅由return self._callmethod(...):组成的方法的麻烦

# a base class with the methods generated for us. The second argument
# doubles as the 'permitted' names, stored as _exposed_
FooProxyBase = MakeProxyType(
"FooProxyBase",
("__getattribute__", "expensive_method", "spam"),
)
class FooProxy(FooProxyBase):
@property
def flag(self):
try:
v = self._flag
except AttributeError:
# ask for the value from the server, "realvalue.flag"
# use __getattribute__ because it's an attribute, not a property
v = self._flag = self._callmethod("__getattribute__", ("flag",))
return flag
def expensive_method(self, *args, **kwargs):
if self.flag:   # cached locally!
return self._callmethod("expensive_method", args, kwargs)
def spam(self, *args, **kwargs):
return self._callmethod("spam", args, kwargs
SyncManager.register("Foo", Foo, FooProxy)

同样,这不会解决标准类型嵌套在其他代理值中的问题。

应用猴痘

我用它来修复AutoProxy可调用,当您运行Python版本时,这个应该自动避免修补,其中修复已经应用于源代码:

# Backport of https://github.com/python/cpython/pull/4819
# Improvements to the Manager / proxied shared values code
# broke handling of proxied objects without a custom proxy type,
# as the AutoProxy function was not updated.
#
# This code adds a wrapper to AutoProxy if it is missing the
# new argument.
import logging
from inspect import signature
from functools import wraps
from multiprocessing import managers

logger = logging.getLogger(__name__)
orig_AutoProxy = managers.AutoProxy

@wraps(managers.AutoProxy)
def AutoProxy(*args, incref=True, manager_owned=False, **kwargs):
# Create the autoproxy without the manager_owned flag, then
# update the flag on the generated instance. If the manager_owned flag
# is set, `incref` is disabled, so set it to False here for the same
# result.
autoproxy_incref = False if manager_owned else incref
proxy = orig_AutoProxy(*args, incref=autoproxy_incref, **kwargs)
proxy._owned_by_manager = manager_owned
return proxy

def apply():
if "manager_owned" in signature(managers.AutoProxy).parameters:
return
logger.debug("Patching multiprocessing.managers.AutoProxy to add manager_owned")
managers.AutoProxy = AutoProxy
# re-register any types already registered to SyncManager without a custom
# proxy type, as otherwise these would all be using the old unpatched AutoProxy
SyncManager = managers.SyncManager
registry = managers.SyncManager._registry
for typeid, (callable, exposed, method_to_typeid, proxytype) in registry.items():
if proxytype is not orig_AutoProxy:
continue
create_method = hasattr(managers.SyncManager, typeid)
SyncManager.register(
typeid,
callable=callable,
exposed=exposed,
method_to_typeid=method_to_typeid,
create_method=create_method,
)

导入以上内容并调用apply()函数来修复multiprocessing。在启动管理器服务器之前执行操作!

解决方案编辑多处理源代码

Sergey的原始答案要求您编辑多处理源代码,如下所示:

  1. 找到您的多处理程序包(我的是通过Anaconda安装的,在/anaconda3/lib/python3.6/multiprocessing中)
  2. 打开managers.py
  3. 将键参数manager_owned=True添加到AutoProxy函数中

原始自动代理:

def AutoProxy(token, serializer, manager=None, authkey=None,
exposed=None, incref=True):
...

已编辑的自动代理:

def AutoProxy(token, serializer, manager=None, authkey=None,
exposed=None, incref=True, manager_owned=True):
...

在运行时通过代码提供解决方案

我已经设法解决了意外的关键字参数TypeError异常,而没有直接编辑多处理的源代码,而是在我使用多处理的管理器的地方添加了以下几行代码:

import multiprocessing
# Backup original AutoProxy function
backup_autoproxy = multiprocessing.managers.AutoProxy
# Defining a new AutoProxy that handles unwanted key argument 'manager_owned'
def redefined_autoproxy(token, serializer, manager=None, authkey=None,
exposed=None, incref=True, manager_owned=True):
# Calling original AutoProxy without the unwanted key argument
return backup_autoproxy(token, serializer, manager, authkey,
exposed, incref)
# Updating AutoProxy definition in multiprocessing.managers package
multiprocessing.managers.AutoProxy = redefined_autoproxy

在此处找到临时解决方案。我已经设法通过在multiprocessing\managers.py中向AutoProxy的初始值设定项添加所需的关键字来修复它。不过,我不知道这个kwarg是否对此负责。

如果有人遇到这样的错误:PickleError:Can't pickle<类"multiprocessing.managers.xxxxx'">:多重访问上的属性查找xxxx。管理员失败在实现Martijn的优秀解决方案后,您可以尝试此补丁:

与自动生成的AutoProxy实例不同,MakeProxyType创建的代理类不在multiprocessing.managers命名空间中。因此,您需要通过setattr将其添加到命名空间中,如下所示:

import multiprocessing.managers as mms
from multiprocessing.managers import SyncManager, MakeProxyType, public_methods
TaskProxy = MakeProxyType('Task', public_methods(Task))
setattr(mms, 'Task', TaskProxy)
SyncManager.register('Task', Task, TaskProxy)

TaskProxy是您创建的代理类。您需要使用setattr将其添加到multiprocessing.managers命名空间中。那么它应该会起作用。