多进程程序未终止



我有一个使用multiprocessing的程序。

以下是相关核心:

"""Multiprocessing worker."""
from argparse import Namespace
from datetime import datetime
from logging import INFO, Logger, getLogger
from multiprocessing import Process, Queue
from queue import Empty
from signal import SIGUSR1, SIGUSR2, signal
from typing import Any, Iterator, Sequence, Type
from setproctitle import setproctitle
from homeinfotools.exceptions import SSHConnectionError
from homeinfotools.logging import syslogger

__all__ = ['BaseWorker', 'multiprocess']

class BaseWorker:
"""Stored args and manager to process systems."""
__slots__ = ('index', 'systems', 'results', 'running', 'current_system')
def __init__(self, index: int, systems: Queue, results: Queue):
"""Sets the command line arguments."""
self.index = index
self.systems = systems
self.results = results
self.running = True
self.current_system = None
def __call__(self, args: Namespace) -> None:
"""Runs the worker on the given system."""
setproctitle(self.name)
signal(SIGUSR1, self.signal)
signal(SIGUSR2, self.signal)
while self.running:
try:
self.current_system = system = self.systems.get(timeout=1)
except Empty:
self.logger.info('Finished')
return
result = self.process_system(system, args)
self.results.put_nowait((system, result))
self.logger.info('Aborted')
@property
def info(self) -> str:
"""Returns information about the state of the worker."""
if self.current_system is None:
return 'idle'
return f'Processing system #{self.current_system}'
@property
def logger(self) -> Logger:
"""Returns the worker's logger."""
logger = getLogger(self.name)
logger.setLevel(INFO)
return logger
@property
def name(self) -> str:
"""Returns the worker's name."""
return f'hidsltools-worker-{self.index}'
def signal(self, signal_number: int, _: Any) -> None:
"""Handles the given signal."""
if signal_number == SIGUSR1:
self.logger.info(self.info)
elif signal_number == SIGUSR2:
self.running = False
else:
self.logger.error('Received invalid signal: %i', signal_number)
def process_system(self, system: int, args: Namespace) -> dict:
"""Processes a single system."""
result = {'start': (start := datetime.now()).isoformat()}
try:
result['result'] = self.run(system, args)
except SSHConnectionError:
syslogger(system).error('Could not establish SSH connection.')
result['online'] = False
else:
result['online'] = True
result['end'] = (end := datetime.now()).isoformat()
result['duration'] = str(end - start)
return result
@staticmethod
def run(system: int, args: Namespace) -> dict:
"""Runs the respective processes."""
raise NotImplementedError()

def multiprocess(
worker_cls: Type[BaseWorker],
systems: list[int],
processes: int,
args: Namespace
) -> dict:
"""Spawns workers and waits for them to finish."""
wait_for_processes(list(spawn_workers(
worker_cls,
processes,
sequence_to_queue(systems),
results := Queue(),
args
)))
return dict(iter_queue(results))

def sequence_to_queue(sequence: Sequence[Any]) -> Queue:
"""Returns a queue with items from the given sequence."""
queue = Queue(len(sequence))
for item in sequence:
queue.put(item)
return queue

def spawn_workers(
worker_cls: Type[BaseWorker],
amount: int,
systems: Queue,
results: Queue,
args: Namespace
) -> Iterator[Process]:
"""Spawns worker processes."""
for index in range(amount):
worker = worker_cls(index, systems, results)
process = Process(target=worker, args=(args,))
process.start()
yield process

def wait_for_processes(processes: list[Process]) -> None:
"""Wait for the given processes."""
try:
for process in processes:
process.join()
except KeyboardInterrupt:
for process in processes:
process.kill()
raise

def iter_queue(queue: Queue) -> Iterator[Any]:
"""Yield queue items."""
while not queue.empty():
yield queue.get()

你可以在这里找到完整的程序。

现在有一个问题,即使在所有子进程结束后,程序也不会终止,特别是当有许多子进程时:

$ sysrpc -Rp 8 {201..900}
[ERROR] sysrpc.201: Could not establish SSH connection.
[ERROR] sysrpc.222: Could not establish SSH connection.
[ERROR] sysrpc.208: Could not establish SSH connection.
[ERROR] sysrpc.209: Could not establish SSH connection.
[ERROR] sysrpc.210: Could not establish SSH connection.
[ERROR] sysrpc.217: Could not establish SSH connection.
[ERROR] sysrpc.223: Could not establish SSH connection.
[ERROR] sysrpc.224: Could not establish SSH connection.
[ERROR] sysrpc.225: Could not establish SSH connection.
[ERROR] sysrpc.226: Could not establish SSH connection.
[ERROR] sysrpc.228: Could not establish SSH connection.
[ERROR] sysrpc.229: Could not establish SSH connection.
[ERROR] sysrpc.230: Could not establish SSH connection.
[ERROR] sysrpc.231: Could not establish SSH connection.
[ERROR] sysrpc.232: Could not establish SSH connection.
[ERROR] sysrpc.233: Could not establish SSH connection.
[ERROR] sysrpc.234: Could not establish SSH connection.
[ERROR] sysrpc.247: Could not establish SSH connection.
[ERROR] sysrpc.235: Could not establish SSH connection.
[ERROR] sysrpc.256: Could not establish SSH connection.
[ERROR] sysrpc.258: Could not establish SSH connection.
[ERROR] sysrpc.241: Could not establish SSH connection.
[ERROR] sysrpc.246: Could not establish SSH connection.
[ERROR] sysrpc.278: Could not establish SSH connection.
[ERROR] sysrpc.262: Could not establish SSH connection.
[ERROR] sysrpc.287: Could not establish SSH connection.
[ERROR] sysrpc.276: Could not establish SSH connection.
[ERROR] sysrpc.299: Could not establish SSH connection.
[ERROR] sysrpc.301: Could not establish SSH connection.
[ERROR] sysrpc.263: Could not establish SSH connection.
[ERROR] sysrpc.268: Could not establish SSH connection.
[ERROR] sysrpc.315: Could not establish SSH connection.
[ERROR] sysrpc.274: Could not establish SSH connection.
[ERROR] sysrpc.320: Could not establish SSH connection.
[ERROR] sysrpc.318: Could not establish SSH connection.
[ERROR] sysrpc.302: Could not establish SSH connection.
[ERROR] sysrpc.330: Could not establish SSH connection.
[ERROR] sysrpc.311: Could not establish SSH connection.
[ERROR] sysrpc.312: Could not establish SSH connection.
[ERROR] sysrpc.335: Could not establish SSH connection.
[ERROR] sysrpc.294: Could not establish SSH connection.
[ERROR] sysrpc.358: Could not establish SSH connection.
[ERROR] sysrpc.360: Could not establish SSH connection.
[ERROR] sysrpc.362: Could not establish SSH connection.
[ERROR] sysrpc.308: Could not establish SSH connection.
[ERROR] sysrpc.336: Could not establish SSH connection.
[ERROR] sysrpc.380: Could not establish SSH connection.
[ERROR] sysrpc.386: Could not establish SSH connection.
[ERROR] sysrpc.349: Could not establish SSH connection.
[ERROR] sysrpc.357: Could not establish SSH connection.
[ERROR] sysrpc.417: Could not establish SSH connection.
[ERROR] sysrpc.422: Could not establish SSH connection.
[ERROR] sysrpc.435: Could not establish SSH connection.
[ERROR] sysrpc.408: Could not establish SSH connection.
[ERROR] sysrpc.412: Could not establish SSH connection.
[ERROR] sysrpc.420: Could not establish SSH connection.
[ERROR] sysrpc.436: Could not establish SSH connection.
[ERROR] sysrpc.390: Could not establish SSH connection.
[ERROR] sysrpc.439: Could not establish SSH connection.
[ERROR] sysrpc.440: Could not establish SSH connection.
[ERROR] sysrpc.441: Could not establish SSH connection.
[ERROR] sysrpc.443: Could not establish SSH connection.
[ERROR] sysrpc.446: Could not establish SSH connection.
[ERROR] sysrpc.433: Could not establish SSH connection.
[ERROR] sysrpc.448: Could not establish SSH connection.
[ERROR] sysrpc.449: Could not establish SSH connection.
[ERROR] sysrpc.450: Could not establish SSH connection.
[ERROR] sysrpc.451: Could not establish SSH connection.
[ERROR] sysrpc.452: Could not establish SSH connection.
[ERROR] sysrpc.453: Could not establish SSH connection.
[ERROR] sysrpc.454: Could not establish SSH connection.
[ERROR] sysrpc.455: Could not establish SSH connection.
[ERROR] sysrpc.456: Could not establish SSH connection.
[ERROR] sysrpc.457: Could not establish SSH connection.
[ERROR] sysrpc.464: Could not establish SSH connection.
[ERROR] sysrpc.458: Could not establish SSH connection.
[ERROR] sysrpc.459: Could not establish SSH connection.
[ERROR] sysrpc.460: Could not establish SSH connection.
[ERROR] sysrpc.461: Could not establish SSH connection.
[ERROR] sysrpc.462: Could not establish SSH connection.
[ERROR] sysrpc.463: Could not establish SSH connection.
[ERROR] sysrpc.465: Could not establish SSH connection.
[ERROR] sysrpc.469: Could not establish SSH connection.
[ERROR] sysrpc.471: Could not establish SSH connection.
[ERROR] sysrpc.473: Could not establish SSH connection.
[ERROR] sysrpc.474: Could not establish SSH connection.
[ERROR] sysrpc.479: Could not establish SSH connection.
[ERROR] sysrpc.475: Could not establish SSH connection.
[ERROR] sysrpc.476: Could not establish SSH connection.
[ERROR] sysrpc.477: Could not establish SSH connection.
[ERROR] sysrpc.480: Could not establish SSH connection.
[ERROR] sysrpc.481: Could not establish SSH connection.
[ERROR] sysrpc.482: Could not establish SSH connection.
[ERROR] sysrpc.483: Could not establish SSH connection.
[ERROR] sysrpc.484: Could not establish SSH connection.
[ERROR] sysrpc.489: Could not establish SSH connection.
[ERROR] sysrpc.491: Could not establish SSH connection.
[ERROR] sysrpc.492: Could not establish SSH connection.
[ERROR] sysrpc.497: Could not establish SSH connection.
[ERROR] sysrpc.528: Could not establish SSH connection.
[ERROR] sysrpc.498: Could not establish SSH connection.
[ERROR] sysrpc.532: Could not establish SSH connection.
[ERROR] sysrpc.533: Could not establish SSH connection.
[ERROR] sysrpc.505: Could not establish SSH connection.
[ERROR] sysrpc.541: Could not establish SSH connection.
[ERROR] sysrpc.544: Could not establish SSH connection.
[ERROR] sysrpc.546: Could not establish SSH connection.
[ERROR] sysrpc.511: Could not establish SSH connection.
[ERROR] sysrpc.521: Could not establish SSH connection.
[ERROR] sysrpc.569: Could not establish SSH connection.
[ERROR] sysrpc.573: Could not establish SSH connection.
[ERROR] sysrpc.579: Could not establish SSH connection.
[ERROR] sysrpc.586: Could not establish SSH connection.
[ERROR] sysrpc.543: Could not establish SSH connection.
[ERROR] sysrpc.559: Could not establish SSH connection.
[ERROR] sysrpc.563: Could not establish SSH connection.
[ERROR] sysrpc.572: Could not establish SSH connection.
[ERROR] sysrpc.602: Could not establish SSH connection.
[ERROR] sysrpc.606: Could not establish SSH connection.
[ERROR] sysrpc.581: Could not establish SSH connection.
[ERROR] sysrpc.614: Could not establish SSH connection.
[ERROR] sysrpc.584: Could not establish SSH connection.
[ERROR] sysrpc.616: Could not establish SSH connection.
[ERROR] sysrpc.618: Could not establish SSH connection.
[ERROR] sysrpc.588: Could not establish SSH connection.
[ERROR] sysrpc.632: Could not establish SSH connection.
[ERROR] sysrpc.642: Could not establish SSH connection.
[ERROR] sysrpc.646: Could not establish SSH connection.
[ERROR] sysrpc.626: Could not establish SSH connection.
[ERROR] sysrpc.627: Could not establish SSH connection.
[ERROR] sysrpc.657: Could not establish SSH connection.
[ERROR] sysrpc.647: Could not establish SSH connection.
[ERROR] sysrpc.650: Could not establish SSH connection.
[ERROR] sysrpc.611: Could not establish SSH connection.
[ERROR] sysrpc.623: Could not establish SSH connection.
[ERROR] sysrpc.668: Could not establish SSH connection.
[ERROR] sysrpc.669: Could not establish SSH connection.
[ERROR] sysrpc.670: Could not establish SSH connection.
[ERROR] sysrpc.656: Could not establish SSH connection.
[ERROR] sysrpc.663: Could not establish SSH connection.
[ERROR] sysrpc.687: Could not establish SSH connection.
[ERROR] sysrpc.688: Could not establish SSH connection.
[ERROR] sysrpc.689: Could not establish SSH connection.
[ERROR] sysrpc.690: Could not establish SSH connection.
[ERROR] sysrpc.691: Could not establish SSH connection.
[ERROR] sysrpc.693: Could not establish SSH connection.
[ERROR] sysrpc.702: Could not establish SSH connection.
[ERROR] sysrpc.694: Could not establish SSH connection.
[ERROR] sysrpc.710: Could not establish SSH connection.
[ERROR] sysrpc.715: Could not establish SSH connection.
[ERROR] sysrpc.717: Could not establish SSH connection.
[ERROR] sysrpc.719: Could not establish SSH connection.
[ERROR] sysrpc.696: Could not establish SSH connection.
[ERROR] sysrpc.697: Could not establish SSH connection.
[ERROR] sysrpc.705: Could not establish SSH connection.
[ERROR] sysrpc.742: Could not establish SSH connection.
[ERROR] sysrpc.743: Could not establish SSH connection.
[ERROR] sysrpc.744: Could not establish SSH connection.
[ERROR] sysrpc.747: Could not establish SSH connection.
[ERROR] sysrpc.748: Could not establish SSH connection.
[ERROR] sysrpc.722: Could not establish SSH connection.
[ERROR] sysrpc.695: Could not establish SSH connection.
[ERROR] sysrpc.773: Could not establish SSH connection.
[ERROR] sysrpc.776: Could not establish SSH connection.
[ERROR] sysrpc.783: Could not establish SSH connection.
[ERROR] sysrpc.798: Could not establish SSH connection.
[ERROR] sysrpc.803: Could not establish SSH connection.
[ERROR] sysrpc.812: Could not establish SSH connection.
[ERROR] sysrpc.818: Could not establish SSH connection.
[ERROR] sysrpc.819: Could not establish SSH connection.
[ERROR] sysrpc.822: Could not establish SSH connection.
[ERROR] sysrpc.821: Could not establish SSH connection.
[ERROR] sysrpc.830: Could not establish SSH connection.
[ERROR] sysrpc.831: Could not establish SSH connection.
[ERROR] sysrpc.835: Could not establish SSH connection.
[ERROR] sysrpc.789: Could not establish SSH connection.
[ERROR] sysrpc.790: Could not establish SSH connection.
[ERROR] sysrpc.841: Could not establish SSH connection.
[ERROR] sysrpc.826: Could not establish SSH connection.
[ERROR] sysrpc.842: Could not establish SSH connection.
[ERROR] sysrpc.848: Could not establish SSH connection.
[ERROR] sysrpc.849: Could not establish SSH connection.
[ERROR] sysrpc.850: Could not establish SSH connection.
[ERROR] sysrpc.851: Could not establish SSH connection.
[ERROR] sysrpc.853: Could not establish SSH connection.
[ERROR] sysrpc.854: Could not establish SSH connection.
[ERROR] sysrpc.860: Could not establish SSH connection.
[ERROR] sysrpc.866: Could not establish SSH connection.
[ERROR] sysrpc.867: Could not establish SSH connection.
[ERROR] sysrpc.868: Could not establish SSH connection.
[ERROR] sysrpc.872: Could not establish SSH connection.
[ERROR] sysrpc.856: Could not establish SSH connection.
[ERROR] sysrpc.883: Could not establish SSH connection.
[ERROR] sysrpc.881: Could not establish SSH connection.
[ERROR] sysrpc.877: Could not establish SSH connection.
[ERROR] sysrpc.876: Could not establish SSH connection.
[ERROR] sysrpc.882: Could not establish SSH connection.
[ERROR] sysrpc.887: Could not establish SSH connection.
[ERROR] sysrpc.884: Could not establish SSH connection.
[ERROR] sysrpc.888: Could not establish SSH connection.
[INFO] hidsltools-worker-2: Finished
[ERROR] sysrpc.885: Could not establish SSH connection.
[INFO] hidsltools-worker-0: Finished
[ERROR] sysrpc.896: Could not establish SSH connection.
[ERROR] sysrpc.886: Could not establish SSH connection.
[INFO] hidsltools-worker-5: Finished
[ERROR] sysrpc.897: Could not establish SSH connection.
[INFO] hidsltools-worker-1: Finished
[INFO] hidsltools-worker-4: Finished
[ERROR] sysrpc.898: Could not establish SSH connection.
[ERROR] sysrpc.899: Could not establish SSH connection.
[INFO] hidsltools-worker-7: Finished
[INFO] hidsltools-worker-6: Finished
[ERROR] sysrpc.900: Could not establish SSH connection.
[INFO] hidsltools-worker-3: Finished

可以看到,所有8个工人都完成了,但是程序没有终止。如果我点击return,这也不会终止主进程:

...
[ERROR] sysrpc.900: Could not establish SSH connection.
[INFO] hidsltools-worker-3: Finished




只有当我按Ctrl+C时,主进程才会退出:

[INFO] hidsltools-worker-3: Finished



^CProcess Process-7:
1 ✗ neumann@ThinkCentre ~ $ 

为什么主进程没有终止,即使所有子进程都终止了?

如果您阅读multiprocessing.Queue的文档,特别是警告,您将看到:

警告

如上所述,如果一个子进程将项目放到队列中(并且它没有使用JoinableQueue.cancel_join_thread),那么该进程将不会终止,直到所有缓冲的项目都被刷新到管道中。

这意味着,如果你试图加入这个进程,你可能会得到一个死锁,除非你确定所有已经放在队列上的项目都被消耗了。同样,如果子进程不是守护进程,那么当父进程试图加入所有非守护进程的子进程时,它可能会挂起。

请注意,使用管理器创建的队列没有此问题。参考编程指南

具体来说,您正在调用wait_for_processes,它试图join子进程,然后您正在调用iter_queue,它在子进程放置项目的队列上调用get(顺便说一下,文档还明确表示对queue.empty()的调用对于multiprocessing.Queue实例来说不是可靠。

您显然需要反转操作顺序,即主进程需要在尝试加入子进程之前将所有项目从队列中取出—这应该在不使用queue.empty的情况下完成。

一种解决方案是将multiprocssing.JoinableQueue用于子进程从中获取的输入队列,并在创建将从该队列获取的子进程之前将所有要处理的项放在该队列中。然后,在子进程从该队列检索到一个项目并将响应放入输出队列后,它们调用输入队列上的task_done方法,表示该工作的处理已经完成。然后,主进程只需调用输入队列上的join_queue方法,以确保所有输入任务都已处理,并且它们的响应已放在输出队列上。然后你只需要在结果队列上调用get_nowait,直到它抛出Empty异常。

这是一般的(然而,未经测试的))的想法。您可能需要添加附加代码来处理发出信号且子进程无法处理整个输入队列的情况:

"""Multiprocessing worker."""
from argparse import Namespace
from datetime import datetime
from logging import INFO, Logger, getLogger
from multiprocessing import Process, Queue, JoinableQueue
from queue import Empty
from signal import SIGUSR1, SIGUSR2, signal
from typing import Any, Iterator, Sequence, Type
from setproctitle import setproctitle
from homeinfotools.exceptions import SSHConnectionError
from homeinfotools.logging import syslogger

__all__ = ['BaseWorker', 'multiprocess']

class BaseWorker:
"""Stored args and manager to process systems."""
__slots__ = ('index', 'systems', 'results', 'running', 'current_system')
def __init__(self, index: int, systems: JoinableQueue, results: Queue):
"""Sets the command line arguments."""
self.index = index
self.systems = systems
self.results = results
self.running = True
self.current_system = None
def __call__(self, args: Namespace) -> None:
"""Runs the worker on the given system."""
setproctitle(self.name)
signal(SIGUSR1, self.signal)
signal(SIGUSR2, self.signal)
# We must process all the input even when self.running is False,
# else main process's call to join_queue will not succeed
while True:
try:
# Items are already on the queue so use get_nowait()
self.current_system = self.systems.get_nowait()
except Empty:
self.logger.info('Finished')
return
if self.running:
# then process the input for real:
result = self.process_system(self.current_system, args)
self.results.put_nowait((self.current_system, result))
# Show we are done with this item and any result has been put:
self.systems.task_done()
if not self.running:
self.logger.info('Aborted')
@property
def info(self) -> str:
"""Returns information about the state of the worker."""
if self.current_system is None:
return 'idle'
return f'Processing system #{self.current_system}'
@property
def logger(self) -> Logger:
"""Returns the worker's logger."""
logger = getLogger(self.name)
logger.setLevel(INFO)
return logger
@property
def name(self) -> str:
"""Returns the worker's name."""
return f'hidsltools-worker-{self.index}'
def signal(self, signal_number: int, _: Any) -> None:
"""Handles the given signal."""
if signal_number == SIGUSR1:
self.logger.info(self.info)
elif signal_number == SIGUSR2:
self.running = False
else:
self.logger.error('Received invalid signal: %i', signal_number)
def process_system(self, system: int, args: Namespace) -> dict:
"""Processes a single system."""
result = {'start': (start := datetime.now()).isoformat()}
try:
result['result'] = self.run(system, args)
except SSHConnectionError:
syslogger(system).error('Could not establish SSH connection.')
result['online'] = False
else:
result['online'] = True
result['end'] = (end := datetime.now()).isoformat()
result['duration'] = str(end - start)
return result
@staticmethod
def run(system: int, args: Namespace) -> dict:
"""Runs the respective processes."""
raise NotImplementedError()

def multiprocess(
worker_cls: Type[BaseWorker],
systems: list[int],
processes: int,
args: Namespace
) -> dict:
"""Spawns workers and waits for them to finish."""
# First put all items on the input queue:
systems_queue = sequence_to_queue(systems)
# Then start the children that will process the input queue
process_list = list(spawn_workers(
worker_cls,
processes,
systems_queue,
results := Queue(),
args
))
# Wait for all work to be completed:
systems_queue.join()
# Now iterate the output queue
d = dict(iter_queue(results))
# Finally wait for the processes to complete:
wait_for_processes(process_list)
return d

def sequence_to_queue(sequence: Sequence[Any]) -> JoinableQueue:
"""Returns a queue with items from the given sequence."""
queue = JoinableQueue(len(sequence))
for item in sequence:
queue.put(item)
return queue

def spawn_workers(
worker_cls: Type[BaseWorker],
amount: int,
systems: JoinableQueue,
results: Queue,
args: Namespace
) -> Iterator[Process]:
"""Spawns worker processes."""
for index in range(amount):
worker = worker_cls(index, systems, results)
process = Process(target=worker, args=(args,))
process.start()
yield process

def wait_for_processes(processes: list[Process]) -> None:
"""Wait for the given processes."""
try:
for process in processes:
process.join()
except KeyboardInterrupt:
for process in processes:
process.kill()
raise

def iter_queue(queue: Queue) -> Iterator[Any]:
"""Yield queue items."""
try:
while True:
yield queue.get_nowait()
except Empty:
pass

注意

在上面的代码中,我选择在创建子进程之前将所有输入项放在输入队列中。这允许子进程在输入队列上执行get_nowait方法,当它返回Empty异常时,子进程知道所有输入已经被处理,并且它可以返回允许该进程最终完成join。或者,主进程可以在启动子进程之后将项放到输入队列中。但是在这种情况下,子进程必须在队列上发出阻塞get调用,并且永远无法知道是否以及何时将有任何未来的项目放置在队列上。在这种情况下,子进程将永远不会终止(除非主进程放置N哨兵项,例如None,表示这些是将被放置在队列中的最后项目,并且子进程在看到这些哨兵之一后终止)。因此,这些子进程需要是从未被主进程加入的守护进程进程。


第二个解决方案是每个子进程在结果队列中放置一个None项(或None以外的某个值),该值不能与实际响应值相混淆,并且可以作为"此子进程将不再写入输出";(指示器),当它检测到输入队列上没有更多的项目时。然后,主进程只是重复调用结果队列上的get,直到它检测到N个None项,其中N是正在处理输入队列上的项的子进程的数量。

顺便说一下,最好不要有一个名为system的局部变量,它恰好与标准Python库模块的名称相同。

最新更新