如何在不传递引用的情况下使用Python中的SyncManager跨进程共享列表



在python中,多处理模块提供了可以在进程之间生成共享列表/dict的管理器。

但是,如果访问管理器的进程不是子进程,而是通过Manager.connect连接到管理器,那么我在使用这些共享对象时会遇到问题。

这里有一个非常基本的例子:我正在尝试创建一个可以由一组进程访问的共享列表。对于这个例子,我只是在两个终端窗口中两次启动相同的代码:

import os, time
from multiprocessing.managers import SyncManager

def main() -> None:
print(f"I am process {os.getpid()}")
print(f"Starting proxy server...")

manager = SyncManager(address=("127.0.0.1", 8000), authkey=b"noauth")
try:
manager.start() # will start the sync process if it doesn't exist
except:
manager.connect() # if it does already exist, connect to it instead

print(f"Proxy server started/connected")

# would like to generate a shared list that each process can access.
sharedList = manager.list() # this generates a new list, so each process gets their own, which is not what I want

sharedList.append(os.getpid())

time.sleep(20)

if __name__ == '__main__':
main()

关于使用远程管理器的Python文档似乎与我想要的类似,但没有关于如何共享Manager.listManager.dict的信息。

注意:我也非常乐意共享一个名称空间对象。

以下是我最终解决问题的方法。您需要手动生成一个拥有共享列表的管理器进程。

import multiprocessing
from multiprocessing import process
import os, time, sys
from multiprocessing.managers import SyncManager, ListProxy
from queue import Queue
class SharedStorage(SyncManager):
pass
def ManagerProcess():
sys.stdout = open(os.devnull, 'w') 
sys.stderr = open(os.devnull, 'w') 
l = list()
SharedStorage.register('get_list', lambda: l, ListProxy)
try: 
ss = SharedStorage(address=("127.0.0.1", 8000), authkey=b"noauth")
ss.get_server().serve_forever()
except OSError:
# failed to listen on port - already in use.
pass
def main() -> None:
print(f"I am process {os.getpid()}")
print(f"Starting proxy server...")
mainProcess = multiprocessing.Process(target=ManagerProcess, daemon=True)
mainProcess.start()
SharedStorage.register('get_list')
manager = SharedStorage(address=("127.0.0.1", 8000), authkey=b"noauth")
manager.connect()
print(f"Proxy server started/connected")
# required - see https://bugs.python.org/issue7503
multiprocessing.current_process().authkey = b"noauth"
# get reference to the shared list object
shared_list = manager.get_list()
shared_list.append(os.getpid())
for i in shared_list:
print(i)
time.sleep(20)
if __name__ == '__main__':
main()

这可以安全地运行几次,因为后期进程生成的管理器进程将在无法侦听端口后退出

最新更新