Python多进程远程管理器:使用BaseManager.start()而不是.server().serve_forev



我在这里遵循官方python文档中的示例:

我试图使它,以便我在localhost:50000上启动BaseManager,注册一个队列,然后从该队列读取一堆工人。如果我在官方python文档中使用有三个文件(一个服务器,一个放客户端,一个get客户端)的方法,我可以让它工作,但我不能让它在一个文件中全部工作,我通过multiprocessing.Process(target=...)生成客户端。

这是我的完整代码。问题是,当客户端尝试连接时,它们会得到一个ConnectionRefused(下面的堆栈跟踪)

from typing import Dict, Optional, Any, List
from multiprocessing.managers import BaseManager, SyncManager
import time
import multiprocessing as mp
import argparse
import queue
q = queue.Queue()
def parse_args() -> argparse.Namespace:
a = argparse.ArgumentParser()
a.add_argument("--n-workers", type=int, default=2)
return a.parse_args()
def run_queue_server(args: argparse.Namespace) -> None:
class QueueManager(BaseManager): pass
QueueManager.register("get_queue", lambda: q)
m = QueueManager(address=('', 50000), authkey=b'abracadabra')
m.start()
def _worker_process(worker_uid: str) -> None:
class QueueManager(BaseManager): pass
QueueManager.register("get_queue")
m = QueueManager(address=('', 50000), authkey=b'abracadabra')
# <-- This line fails with ConnectionRefused -->
m.connect()
queue: queue.Queue = m.get_queue()
def spawn_workers(args: argparse.Namespace) -> None:
time.sleep(2)
worker_procs = dict()
for i in range(args.n_workers):
print(f"Spawning worker process {i}..")
p = mp.Process(target=_worker_process, args=[str(i)])
p.start()
worker_procs[str(i)] = p
def main():
args = parse_args()
run_queue_server(args)
spawn_workers(args)
while True:
time.sleep(1)
if __name__ == '__main__':
main()

错误在这里

$ python minimal.py
Spawning worker process 0..
Spawning worker process 1..
Process Process-2:
Process Process-3:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "minimal.py", line 26, in _worker_process
m.connect()
File "/usr/lib/python3.8/multiprocessing/managers.py", line 548, in connect
conn = Client(self._address, authkey=self._authkey)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 502, in Client
c = SocketClient(address)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 629, in SocketClient
s.connect(address)
ConnectionRefusedError: [Errno 111] Connection refused
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "minimal.py", line 26, in _worker_process
m.connect()
File "/usr/lib/python3.8/multiprocessing/managers.py", line 548, in connect
conn = Client(self._address, authkey=self._authkey)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 502, in Client
c = SocketClient(address)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 629, in SocketClient
s.connect(address)
ConnectionRefusedError: [Errno 111] Connection refused
但是,如果我生成另一个以管理器创建步骤为目标的进程并运行m.get_server().serve_forever(),那么我不会得到连接拒绝错误,请参阅下面的工作代码
from typing import Dict, Optional, Any, List
from multiprocessing.managers import BaseManager, SyncManager
import time
import multiprocessing as mp
import argparse
import queue
q = queue.Queue()
def parse_args() -> argparse.Namespace:
a = argparse.ArgumentParser()
a.add_argument("--n-workers", type=int, default=2)
return a.parse_args()
def run_queue_server(args: argparse.Namespace) -> None:
class QueueManager(BaseManager): pass
QueueManager.register("get_queue", lambda: q)
m = QueueManager(address=('', 50000), authkey=b'abracadabra')
#m.start()
# This works!!
m.get_server().serve_forever()
def _worker_process(worker_uid: str) -> None:
class QueueManager(BaseManager): pass
QueueManager.register("get_queue")
m = QueueManager(address=('', 50000), authkey=b'abracadabra')
m.connect()
queue: queue.Queue = m.get_queue()
print(f"Gotten queue: {queue}")
def spawn_workers(args: argparse.Namespace) -> None:
time.sleep(2)
worker_procs = dict()
for i in range(args.n_workers):
print(f"Spawning worker process {i}..")
p = mp.Process(target=_worker_process, args=[str(i)])
p.start()
worker_procs[str(i)] = p
def main():
args = parse_args()
#run_queue_server(args)
# I don't want to run this in another process?
mp.Process(target=run_queue_server, args=(args,)).start()
spawn_workers(args)
while True:
time.sleep(1)
if __name__ == '__main__':
main()
问题是,我不想为了当经理而开始另一个过程。为什么就不能这样呢?

编辑-我是一个编程到深夜的白痴。问题是,我的run_queue_server时调用m.start()和返回是…然后失去对QueueManager的引用,我确信这会导致它被垃圾收集。

我所做的只是改变

def run_queue_server(args: argparse.Namespace) -> None:
class QueueManager(BaseManager): pass
QueueManager.register("get_queue", lambda: q)
m = QueueManager(address=('', 50000), authkey=b'abracadabra')
m.start()
return m

,并改变调用者接受返回值,一切工作…

相关内容

  • 没有找到相关文章

最新更新