如何使用BaseManager正确设置嵌套代理变量



文档提到,您可以使用嵌套的代理变量设置Manager,但我找不到任何示例,也无法使其发挥作用。

我使用的是flask,它运行在一个init进程中,下面的代码片段来自这个进程。每个PID都试图启动管理器,如果它已经启动,它会连接并获取代理变量。

具体来说,我连接到Jira,提取版本列表,并存储它。由于这是一个昂贵的操作,我将这些结果存储在VersionsList中。我还有VersionsDict,它有一些变量,比如last fetch、fetching等。我希望这些需要代理变量的类中的每一个都只有一个大的代理变量,并在其中嵌套后续变量,但我似乎无法做到这一点。

示例代码我有:

from multiprocessing import Lock, set_start_method
from multiprocessing.managers import (AcquirerProxy, BaseManager, DictProxy,
ListProxy)
from os import getpid
class DataManager(BaseManager): pass
IP_ADDRESS = '127.0.0.1'
PORT = 50000
AUTHKEY = b'password'
"""
If this is not set, this won't work on MacOS. https://github.com/pytest-dev/pytest-flask/issues/104
"""
set_start_method("fork")
"""
This code will not run on Windows since `fork` only runs on Unix.
https://docs.python.org/3.8/library/multiprocessing.html#contexts-and-start-methods
"""
def StartManageServer():
"""
Create a server that will share proxy variables between PIDs
"""
VersionsList = []
VersionsDict = {'last_updated': None, 'versions': []}

DataManager.register("get_VersionDict", lambda: VersionsDict, DictProxy)
DataManager.register("get_VersionList", lambda: VersionsList, ListProxy)

try:
manager = DataManager(address=(IP_ADDRESS, PORT), authkey=AUTHKEY)
manager.get_server()  # Raises OSError if a server is already running
manager.start()
log.info(f"Starting DataManager server from pid {getpid()}")
except OSError:  # Already a server running
log.error(
f"DataManager server is already running, returning - PID: {getpid()}")
finally:
manager.connect()
return manager
class ManagedVariables:
manager = StartManageServer()
def _Logger(func):
def inner(cls):
print(f"PID {getpid()} is requesting {func.__name__}")
return func(cls)
return inner

@classmethod
@_Logger
def getVersions(cls):
return cls.manager.get_VersionDict()

@classmethod
@_Logger
def getVersionsList(cls):
return cls.manager.get_VersionList()

StartManageServer类启动服务器并注册代理变量。ManagedVariables连接到服务器,并根据请求分发代理变量。理想情况下,我试图找到一种方法,将版本列表代理变量放入"下的版本dict中;版本";。尝试执行此操作时,会发生以下回溯。

In [2]: v = ManagedVariables.getVersions()
In [3]: vl = ManagedVariables.getVersionsList()
In [5]: v['versions'] = vl
In [6]: v['versions']

---------------------------------------------------------------------------
AuthenticationError                       Traceback (most recent call last)
<ipython-input-6-6a932e5f735b> in <module>
----> 1 v['versions']
<string> in __getitem__(self, *args, **kwds)
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py in _callmethod(self, methodname, args, kwds)
807
808         conn.send((self._id, methodname, args, kwds))
--> 809         kind, result = conn.recv()
810
811         if kind == '#RETURN':
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py in recv(self)
254         self._check_readable()
255         buf = self._recv_bytes()
--> 256         return _ForkingPickler.loads(buf.getbuffer())
257
258     def poll(self, timeout=0.0):
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py in RebuildProxy(func, token, serializer, kwds)
931         not getattr(process.current_process(), '_inheriting', False)
932         )
--> 933     return func(token, serializer, incref=incref, **kwds)
934
935 #
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py in __init__(self, token, serializer, manager, authkey, exposed, incref, manager_owned)
781
782         if incref:
--> 783             self._incref()
784
785         util.register_after_fork(self, BaseProxy._after_fork)
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py in _incref(self)
835             return
836
--> 837         conn = self._Client(self._token.address, authkey=self._authkey)
838         dispatch(conn, None, 'incref', (self._id,))
839         util.debug('INCREF %r', self._token.id)
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py in Client(address, family, authkey)
511
512     if authkey is not None:
--> 513         answer_challenge(c, authkey)
514         deliver_challenge(c, authkey)
515
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py in answer_challenge(connection, authkey)
762     response = connection.recv_bytes(256)        # reject large message
763     if response != WELCOME:
--> 764         raise AuthenticationError('digest sent was rejected')
765
766 #
AuthenticationError: digest sent was rejected

我试着把这个列表放进字典里,但这也不起作用。如有任何协助,我们将不胜感激。

根据这里的讨论,这里的解决方案是在StartManageServer函数中包含此行:

current_process().authkey = AUTHKEY

顺便说一句,您可以在根本不将启动方法设置为fork的情况下使代码正常工作,您只需要确保向管理器注册的可调用项是可拾取的(因此不是lambda(。以下示例:

from multiprocessing import current_process
from multiprocessing.managers import (AcquirerProxy, BaseManager, DictProxy,
ListProxy)
from os import getpid

class DataManager(BaseManager): pass

IP_ADDRESS = '127.0.0.1'
PORT = 50000
AUTHKEY = b'password'

# Class which we will use as callable to register typeids with
class dummy:
def __init__(self, data):
self.data = data
def __call__(self):
return self.data
def StartManageServer():
"""
Create a server that will share proxy variables between PIDs
"""

VersionsList = dummy([])
VersionsDict = dummy({'last_updated': None, 'versions': []})
DataManager.register("get_VersionDict", VersionsDict, DictProxy)
DataManager.register("get_VersionList", VersionsList, ListProxy)
try:
manager = DataManager(address=(IP_ADDRESS, PORT), authkey=AUTHKEY)
manager.get_server()  # Raises OSError if a server is already running
manager.start()
except OSError:  # Already a server running
raise
finally:
current_process().authkey = AUTHKEY
# manager.connect()  # **This is only required when you are connecting to server started by another manager
return manager

class ManagedVariables:
manager = None
@classmethod
def init_manager(cls):
cls.manager = StartManageServer()
def _Logger(func):
def inner(cls):
print(f"PID {getpid()} is requesting {func.__name__}")
return func(cls)
return inner
@classmethod
@_Logger
def getVersions(cls):
return cls.manager.get_VersionDict()
@classmethod
@_Logger
def getVersionsList(cls):
return cls.manager.get_VersionList()
if __name__ == '__main__':
ManagedVariables.init_manager()
v = ManagedVariables.getVersions()
vl = ManagedVariables.getVersionsList()
v['versions'] = vl
print(v['versions'])

输出

PID 135244 is requesting getVersions
PID 135244 is requesting getVersionsList
[]

更新

我假设您在每个请求共享变量的进程中都调用StartManageServer。这将使每个流程都得到一个不同的变量,因为每次启动经理时都要创建一个新的变量。

执行此操作的传统方法是拥有一个服务器和多个客户端。服务器是启动管理器并创建共享变量的服务器,客户端连接到该管理器并请求变量。在您的情况下,server.py将是什么样子:

from multiprocessing.managers import (BaseManager, DictProxy,
ListProxy)

class DataManager(BaseManager): pass

IP_ADDRESS = '127.0.0.1'
PORT = 50000
AUTHKEY = b'password'
def StartManageServer():
"""
Create a server that will share proxy variables between PIDs
"""
VersionsList = []
VersionsDict = {'last_updated': None, 'versions': []}
# Since we will be starting server in current process, we can use unpicklable callables like lambda
DataManager.register("get_VersionDict", lambda: VersionsDict, DictProxy)
DataManager.register("get_VersionList", lambda: VersionsList, ListProxy)
try:
manager = DataManager(address=(IP_ADDRESS, PORT), authkey=AUTHKEY)
srv = manager.get_server()
srv.serve_forever()
except OSError:  # Already a server running
raise

if __name__ == '__main__':
print("Starting server")
StartManageServer()

现在,一旦您启动了服务器,您的客户端就不需要启动一个新的管理器,他们只需要连接到这个管理器。下面是client.py的样子(注意函数ConnectManageServer(:

from multiprocessing import current_process
from server import DataManager
from os import getpid

IP_ADDRESS = '127.0.0.1'
PORT = 50000
AUTHKEY = b'password'

def ConnectManageServer():
# Register the typeids already registered by server (do not register a callable here! It's already done server-side)
DataManager.register("get_VersionDict")
DataManager.register("get_VersionList")
# Connect to an existing manager server
manager = DataManager(address=(IP_ADDRESS, PORT), authkey=AUTHKEY)
manager.connect()
# Change authkey
current_process().authkey = AUTHKEY
return manager

class ManagedVariables:
manager = None
@classmethod
def init_manager(cls):
cls.manager = ConnectManageServer()
def _Logger(func):
def inner(cls):
print(f"PID {getpid()} is requesting {func.__name__}")
return func(cls)
return inner
@classmethod
@_Logger
def getVersions(cls):
return cls.manager.get_VersionDict()
@classmethod
@_Logger
def getVersionsList(cls):
return cls.manager.get_VersionList()
if __name__ == '__main__':
ManagedVariables.init_manager()
v = ManagedVariables.getVersions()
vl = ManagedVariables.getVersionsList()
vl.append(1)
v['versions'] = vl
print(v['versions'])

您可以拥有任意数量的这些客户端进程。您对共享字典和列表所做的任何更改都将自动传播到所有其他进程

相关内容

  • 没有找到相关文章

最新更新