


"""Multiprocessing worker."""
from argparse import Namespace
from datetime import datetime
from logging import INFO, Logger, getLogger
from multiprocessing import Process, Queue
from queue import Empty
from signal import SIGUSR1, SIGUSR2, signal
from typing import Any, Iterator, Sequence, Type
from setproctitle import setproctitle
from homeinfotools.exceptions import SSHConnectionError
from homeinfotools.logging import syslogger

__all__ = ['BaseWorker', 'multiprocess']

class BaseWorker:
"""Stored args and manager to process systems."""
__slots__ = ('index', 'systems', 'results', 'running', 'current_system')
def __init__(self, index: int, systems: Queue, results: Queue):
"""Sets the command line arguments."""
self.index = index
self.systems = systems
self.results = results
self.running = True
self.current_system = None
def __call__(self, args: Namespace) -> None:
"""Runs the worker on the given system."""
signal(SIGUSR1, self.signal)
signal(SIGUSR2, self.signal)
while self.running:
self.current_system = system = self.systems.get(timeout=1)
except Empty:
result = self.process_system(system, args)
self.results.put_nowait((system, result))
def info(self) -> str:
"""Returns information about the state of the worker."""
if self.current_system is None:
return 'idle'
return f'Processing system #{self.current_system}'
def logger(self) -> Logger:
"""Returns the worker's logger."""
logger = getLogger(self.name)
return logger
def name(self) -> str:
"""Returns the worker's name."""
return f'hidsltools-worker-{self.index}'
def signal(self, signal_number: int, _: Any) -> None:
"""Handles the given signal."""
if signal_number == SIGUSR1:
elif signal_number == SIGUSR2:
self.running = False
self.logger.error('Received invalid signal: %i', signal_number)
def process_system(self, system: int, args: Namespace) -> dict:
"""Processes a single system."""
result = {'start': (start := datetime.now()).isoformat()}
result['result'] = self.run(system, args)
except SSHConnectionError:
syslogger(system).error('Could not establish SSH connection.')
result['online'] = False
result['online'] = True
result['end'] = (end := datetime.now()).isoformat()
result['duration'] = str(end - start)
return result
def run(system: int, args: Namespace) -> dict:
"""Runs the respective processes."""
raise NotImplementedError()

def multiprocess(
worker_cls: Type[BaseWorker],
systems: list[int],
processes: int,
args: Namespace
) -> dict:
"""Spawns workers and waits for them to finish."""
results := Queue(),
return dict(iter_queue(results))

def sequence_to_queue(sequence: Sequence[Any]) -> Queue:
"""Returns a queue with items from the given sequence."""
queue = Queue(len(sequence))
for item in sequence:
return queue

def spawn_workers(
worker_cls: Type[BaseWorker],
amount: int,
systems: Queue,
results: Queue,
args: Namespace
) -> Iterator[Process]:
"""Spawns worker processes."""
for index in range(amount):
worker = worker_cls(index, systems, results)
process = Process(target=worker, args=(args,))
yield process

def wait_for_processes(processes: list[Process]) -> None:
"""Wait for the given processes."""
for process in processes:
except KeyboardInterrupt:
for process in processes:

def iter_queue(queue: Queue) -> Iterator[Any]:
"""Yield queue items."""
while not queue.empty():
yield queue.get()



$ sysrpc -Rp 8 {201..900}
[ERROR] sysrpc.201: Could not establish SSH connection.
[ERROR] sysrpc.222: Could not establish SSH connection.
[ERROR] sysrpc.208: Could not establish SSH connection.
[ERROR] sysrpc.209: Could not establish SSH connection.
[ERROR] sysrpc.210: Could not establish SSH connection.
[ERROR] sysrpc.217: Could not establish SSH connection.
[ERROR] sysrpc.223: Could not establish SSH connection.
[ERROR] sysrpc.224: Could not establish SSH connection.
[ERROR] sysrpc.225: Could not establish SSH connection.
[ERROR] sysrpc.226: Could not establish SSH connection.
[ERROR] sysrpc.228: Could not establish SSH connection.
[ERROR] sysrpc.229: Could not establish SSH connection.
[ERROR] sysrpc.230: Could not establish SSH connection.
[ERROR] sysrpc.231: Could not establish SSH connection.
[ERROR] sysrpc.232: Could not establish SSH connection.
[ERROR] sysrpc.233: Could not establish SSH connection.
[ERROR] sysrpc.234: Could not establish SSH connection.
[ERROR] sysrpc.247: Could not establish SSH connection.
[ERROR] sysrpc.235: Could not establish SSH connection.
[ERROR] sysrpc.256: Could not establish SSH connection.
[ERROR] sysrpc.258: Could not establish SSH connection.
[ERROR] sysrpc.241: Could not establish SSH connection.
[ERROR] sysrpc.246: Could not establish SSH connection.
[ERROR] sysrpc.278: Could not establish SSH connection.
[ERROR] sysrpc.262: Could not establish SSH connection.
[ERROR] sysrpc.287: Could not establish SSH connection.
[ERROR] sysrpc.276: Could not establish SSH connection.
[ERROR] sysrpc.299: Could not establish SSH connection.
[ERROR] sysrpc.301: Could not establish SSH connection.
[ERROR] sysrpc.263: Could not establish SSH connection.
[ERROR] sysrpc.268: Could not establish SSH connection.
[ERROR] sysrpc.315: Could not establish SSH connection.
[ERROR] sysrpc.274: Could not establish SSH connection.
[ERROR] sysrpc.320: Could not establish SSH connection.
[ERROR] sysrpc.318: Could not establish SSH connection.
[ERROR] sysrpc.302: Could not establish SSH connection.
[ERROR] sysrpc.330: Could not establish SSH connection.
[ERROR] sysrpc.311: Could not establish SSH connection.
[ERROR] sysrpc.312: Could not establish SSH connection.
[ERROR] sysrpc.335: Could not establish SSH connection.
[ERROR] sysrpc.294: Could not establish SSH connection.
[ERROR] sysrpc.358: Could not establish SSH connection.
[ERROR] sysrpc.360: Could not establish SSH connection.
[ERROR] sysrpc.362: Could not establish SSH connection.
[ERROR] sysrpc.308: Could not establish SSH connection.
[ERROR] sysrpc.336: Could not establish SSH connection.
[ERROR] sysrpc.380: Could not establish SSH connection.
[ERROR] sysrpc.386: Could not establish SSH connection.
[ERROR] sysrpc.349: Could not establish SSH connection.
[ERROR] sysrpc.357: Could not establish SSH connection.
[ERROR] sysrpc.417: Could not establish SSH connection.
[ERROR] sysrpc.422: Could not establish SSH connection.
[ERROR] sysrpc.435: Could not establish SSH connection.
[ERROR] sysrpc.408: Could not establish SSH connection.
[ERROR] sysrpc.412: Could not establish SSH connection.
[ERROR] sysrpc.420: Could not establish SSH connection.
[ERROR] sysrpc.436: Could not establish SSH connection.
[ERROR] sysrpc.390: Could not establish SSH connection.
[ERROR] sysrpc.439: Could not establish SSH connection.
[ERROR] sysrpc.440: Could not establish SSH connection.
[ERROR] sysrpc.441: Could not establish SSH connection.
[ERROR] sysrpc.443: Could not establish SSH connection.
[ERROR] sysrpc.446: Could not establish SSH connection.
[ERROR] sysrpc.433: Could not establish SSH connection.
[ERROR] sysrpc.448: Could not establish SSH connection.
[ERROR] sysrpc.449: Could not establish SSH connection.
[ERROR] sysrpc.450: Could not establish SSH connection.
[ERROR] sysrpc.451: Could not establish SSH connection.
[ERROR] sysrpc.452: Could not establish SSH connection.
[ERROR] sysrpc.453: Could not establish SSH connection.
[ERROR] sysrpc.454: Could not establish SSH connection.
[ERROR] sysrpc.455: Could not establish SSH connection.
[ERROR] sysrpc.456: Could not establish SSH connection.
[ERROR] sysrpc.457: Could not establish SSH connection.
[ERROR] sysrpc.464: Could not establish SSH connection.
[ERROR] sysrpc.458: Could not establish SSH connection.
[ERROR] sysrpc.459: Could not establish SSH connection.
[ERROR] sysrpc.460: Could not establish SSH connection.
[ERROR] sysrpc.461: Could not establish SSH connection.
[ERROR] sysrpc.462: Could not establish SSH connection.
[ERROR] sysrpc.463: Could not establish SSH connection.
[ERROR] sysrpc.465: Could not establish SSH connection.
[ERROR] sysrpc.469: Could not establish SSH connection.
[ERROR] sysrpc.471: Could not establish SSH connection.
[ERROR] sysrpc.473: Could not establish SSH connection.
[ERROR] sysrpc.474: Could not establish SSH connection.
[ERROR] sysrpc.479: Could not establish SSH connection.
[ERROR] sysrpc.475: Could not establish SSH connection.
[ERROR] sysrpc.476: Could not establish SSH connection.
[ERROR] sysrpc.477: Could not establish SSH connection.
[ERROR] sysrpc.480: Could not establish SSH connection.
[ERROR] sysrpc.481: Could not establish SSH connection.
[ERROR] sysrpc.482: Could not establish SSH connection.
[ERROR] sysrpc.483: Could not establish SSH connection.
[ERROR] sysrpc.484: Could not establish SSH connection.
[ERROR] sysrpc.489: Could not establish SSH connection.
[ERROR] sysrpc.491: Could not establish SSH connection.
[ERROR] sysrpc.492: Could not establish SSH connection.
[ERROR] sysrpc.497: Could not establish SSH connection.
[ERROR] sysrpc.528: Could not establish SSH connection.
[ERROR] sysrpc.498: Could not establish SSH connection.
[ERROR] sysrpc.532: Could not establish SSH connection.
[ERROR] sysrpc.533: Could not establish SSH connection.
[ERROR] sysrpc.505: Could not establish SSH connection.
[ERROR] sysrpc.541: Could not establish SSH connection.
[ERROR] sysrpc.544: Could not establish SSH connection.
[ERROR] sysrpc.546: Could not establish SSH connection.
[ERROR] sysrpc.511: Could not establish SSH connection.
[ERROR] sysrpc.521: Could not establish SSH connection.
[ERROR] sysrpc.569: Could not establish SSH connection.
[ERROR] sysrpc.573: Could not establish SSH connection.
[ERROR] sysrpc.579: Could not establish SSH connection.
[ERROR] sysrpc.586: Could not establish SSH connection.
[ERROR] sysrpc.543: Could not establish SSH connection.
[ERROR] sysrpc.559: Could not establish SSH connection.
[ERROR] sysrpc.563: Could not establish SSH connection.
[ERROR] sysrpc.572: Could not establish SSH connection.
[ERROR] sysrpc.602: Could not establish SSH connection.
[ERROR] sysrpc.606: Could not establish SSH connection.
[ERROR] sysrpc.581: Could not establish SSH connection.
[ERROR] sysrpc.614: Could not establish SSH connection.
[ERROR] sysrpc.584: Could not establish SSH connection.
[ERROR] sysrpc.616: Could not establish SSH connection.
[ERROR] sysrpc.618: Could not establish SSH connection.
[ERROR] sysrpc.588: Could not establish SSH connection.
[ERROR] sysrpc.632: Could not establish SSH connection.
[ERROR] sysrpc.642: Could not establish SSH connection.
[ERROR] sysrpc.646: Could not establish SSH connection.
[ERROR] sysrpc.626: Could not establish SSH connection.
[ERROR] sysrpc.627: Could not establish SSH connection.
[ERROR] sysrpc.657: Could not establish SSH connection.
[ERROR] sysrpc.647: Could not establish SSH connection.
[ERROR] sysrpc.650: Could not establish SSH connection.
[ERROR] sysrpc.611: Could not establish SSH connection.
[ERROR] sysrpc.623: Could not establish SSH connection.
[ERROR] sysrpc.668: Could not establish SSH connection.
[ERROR] sysrpc.669: Could not establish SSH connection.
[ERROR] sysrpc.670: Could not establish SSH connection.
[ERROR] sysrpc.656: Could not establish SSH connection.
[ERROR] sysrpc.663: Could not establish SSH connection.
[ERROR] sysrpc.687: Could not establish SSH connection.
[ERROR] sysrpc.688: Could not establish SSH connection.
[ERROR] sysrpc.689: Could not establish SSH connection.
[ERROR] sysrpc.690: Could not establish SSH connection.
[ERROR] sysrpc.691: Could not establish SSH connection.
[ERROR] sysrpc.693: Could not establish SSH connection.
[ERROR] sysrpc.702: Could not establish SSH connection.
[ERROR] sysrpc.694: Could not establish SSH connection.
[ERROR] sysrpc.710: Could not establish SSH connection.
[ERROR] sysrpc.715: Could not establish SSH connection.
[ERROR] sysrpc.717: Could not establish SSH connection.
[ERROR] sysrpc.719: Could not establish SSH connection.
[ERROR] sysrpc.696: Could not establish SSH connection.
[ERROR] sysrpc.697: Could not establish SSH connection.
[ERROR] sysrpc.705: Could not establish SSH connection.
[ERROR] sysrpc.742: Could not establish SSH connection.
[ERROR] sysrpc.743: Could not establish SSH connection.
[ERROR] sysrpc.744: Could not establish SSH connection.
[ERROR] sysrpc.747: Could not establish SSH connection.
[ERROR] sysrpc.748: Could not establish SSH connection.
[ERROR] sysrpc.722: Could not establish SSH connection.
[ERROR] sysrpc.695: Could not establish SSH connection.
[ERROR] sysrpc.773: Could not establish SSH connection.
[ERROR] sysrpc.776: Could not establish SSH connection.
[ERROR] sysrpc.783: Could not establish SSH connection.
[ERROR] sysrpc.798: Could not establish SSH connection.
[ERROR] sysrpc.803: Could not establish SSH connection.
[ERROR] sysrpc.812: Could not establish SSH connection.
[ERROR] sysrpc.818: Could not establish SSH connection.
[ERROR] sysrpc.819: Could not establish SSH connection.
[ERROR] sysrpc.822: Could not establish SSH connection.
[ERROR] sysrpc.821: Could not establish SSH connection.
[ERROR] sysrpc.830: Could not establish SSH connection.
[ERROR] sysrpc.831: Could not establish SSH connection.
[ERROR] sysrpc.835: Could not establish SSH connection.
[ERROR] sysrpc.789: Could not establish SSH connection.
[ERROR] sysrpc.790: Could not establish SSH connection.
[ERROR] sysrpc.841: Could not establish SSH connection.
[ERROR] sysrpc.826: Could not establish SSH connection.
[ERROR] sysrpc.842: Could not establish SSH connection.
[ERROR] sysrpc.848: Could not establish SSH connection.
[ERROR] sysrpc.849: Could not establish SSH connection.
[ERROR] sysrpc.850: Could not establish SSH connection.
[ERROR] sysrpc.851: Could not establish SSH connection.
[ERROR] sysrpc.853: Could not establish SSH connection.
[ERROR] sysrpc.854: Could not establish SSH connection.
[ERROR] sysrpc.860: Could not establish SSH connection.
[ERROR] sysrpc.866: Could not establish SSH connection.
[ERROR] sysrpc.867: Could not establish SSH connection.
[ERROR] sysrpc.868: Could not establish SSH connection.
[ERROR] sysrpc.872: Could not establish SSH connection.
[ERROR] sysrpc.856: Could not establish SSH connection.
[ERROR] sysrpc.883: Could not establish SSH connection.
[ERROR] sysrpc.881: Could not establish SSH connection.
[ERROR] sysrpc.877: Could not establish SSH connection.
[ERROR] sysrpc.876: Could not establish SSH connection.
[ERROR] sysrpc.882: Could not establish SSH connection.
[ERROR] sysrpc.887: Could not establish SSH connection.
[ERROR] sysrpc.884: Could not establish SSH connection.
[ERROR] sysrpc.888: Could not establish SSH connection.
[INFO] hidsltools-worker-2: Finished
[ERROR] sysrpc.885: Could not establish SSH connection.
[INFO] hidsltools-worker-0: Finished
[ERROR] sysrpc.896: Could not establish SSH connection.
[ERROR] sysrpc.886: Could not establish SSH connection.
[INFO] hidsltools-worker-5: Finished
[ERROR] sysrpc.897: Could not establish SSH connection.
[INFO] hidsltools-worker-1: Finished
[INFO] hidsltools-worker-4: Finished
[ERROR] sysrpc.898: Could not establish SSH connection.
[ERROR] sysrpc.899: Could not establish SSH connection.
[INFO] hidsltools-worker-7: Finished
[INFO] hidsltools-worker-6: Finished
[ERROR] sysrpc.900: Could not establish SSH connection.
[INFO] hidsltools-worker-3: Finished


[ERROR] sysrpc.900: Could not establish SSH connection.
[INFO] hidsltools-worker-3: Finished


[INFO] hidsltools-worker-3: Finished

^CProcess Process-7:
1 ✗ neumann@ThinkCentre ~ $ 











"""Multiprocessing worker."""
from argparse import Namespace
from datetime import datetime
from logging import INFO, Logger, getLogger
from multiprocessing import Process, Queue, JoinableQueue
from queue import Empty
from signal import SIGUSR1, SIGUSR2, signal
from typing import Any, Iterator, Sequence, Type
from setproctitle import setproctitle
from homeinfotools.exceptions import SSHConnectionError
from homeinfotools.logging import syslogger

__all__ = ['BaseWorker', 'multiprocess']

class BaseWorker:
"""Stored args and manager to process systems."""
__slots__ = ('index', 'systems', 'results', 'running', 'current_system')
def __init__(self, index: int, systems: JoinableQueue, results: Queue):
"""Sets the command line arguments."""
self.index = index
self.systems = systems
self.results = results
self.running = True
self.current_system = None
def __call__(self, args: Namespace) -> None:
"""Runs the worker on the given system."""
signal(SIGUSR1, self.signal)
signal(SIGUSR2, self.signal)
# We must process all the input even when self.running is False,
# else main process's call to join_queue will not succeed
while True:
# Items are already on the queue so use get_nowait()
self.current_system = self.systems.get_nowait()
except Empty:
if self.running:
# then process the input for real:
result = self.process_system(self.current_system, args)
self.results.put_nowait((self.current_system, result))
# Show we are done with this item and any result has been put:
if not self.running:
def info(self) -> str:
"""Returns information about the state of the worker."""
if self.current_system is None:
return 'idle'
return f'Processing system #{self.current_system}'
def logger(self) -> Logger:
"""Returns the worker's logger."""
logger = getLogger(self.name)
return logger
def name(self) -> str:
"""Returns the worker's name."""
return f'hidsltools-worker-{self.index}'
def signal(self, signal_number: int, _: Any) -> None:
"""Handles the given signal."""
if signal_number == SIGUSR1:
elif signal_number == SIGUSR2:
self.running = False
self.logger.error('Received invalid signal: %i', signal_number)
def process_system(self, system: int, args: Namespace) -> dict:
"""Processes a single system."""
result = {'start': (start := datetime.now()).isoformat()}
result['result'] = self.run(system, args)
except SSHConnectionError:
syslogger(system).error('Could not establish SSH connection.')
result['online'] = False
result['online'] = True
result['end'] = (end := datetime.now()).isoformat()
result['duration'] = str(end - start)
return result
def run(system: int, args: Namespace) -> dict:
"""Runs the respective processes."""
raise NotImplementedError()

def multiprocess(
worker_cls: Type[BaseWorker],
systems: list[int],
processes: int,
args: Namespace
) -> dict:
"""Spawns workers and waits for them to finish."""
# First put all items on the input queue:
systems_queue = sequence_to_queue(systems)
# Then start the children that will process the input queue
process_list = list(spawn_workers(
results := Queue(),
# Wait for all work to be completed:
# Now iterate the output queue
d = dict(iter_queue(results))
# Finally wait for the processes to complete:
return d

def sequence_to_queue(sequence: Sequence[Any]) -> JoinableQueue:
"""Returns a queue with items from the given sequence."""
queue = JoinableQueue(len(sequence))
for item in sequence:
return queue

def spawn_workers(
worker_cls: Type[BaseWorker],
amount: int,
systems: JoinableQueue,
results: Queue,
args: Namespace
) -> Iterator[Process]:
"""Spawns worker processes."""
for index in range(amount):
worker = worker_cls(index, systems, results)
process = Process(target=worker, args=(args,))
yield process

def wait_for_processes(processes: list[Process]) -> None:
"""Wait for the given processes."""
for process in processes:
except KeyboardInterrupt:
for process in processes:

def iter_queue(queue: Queue) -> Iterator[Any]:
"""Yield queue items."""
while True:
yield queue.get_nowait()
except Empty:




