如何在Python中分别启动和停止多处理进程



我使用专用的Python(3.8(库通过USB端口控制电机驱动器。

电机控制驱动器制造商(ODrive(提供的Python库允许单个Python进程控制一个或多个驱动器。

但是,我想运行3个进程,每个进程控制一个驱动器。

在研究了各种选项之后(我首先考虑了虚拟机、Docker容器和多线程(,我开始相信最简单的方法是使用multiprocessing

我的问题是,我需要一种方法来管理(即,独立启动、监控和停止(多个流程。其背后的实际原因是电机连接到不同的设置。例如,如果出现故障,每个设置都必须能够单独停止和重新启动,但其他正在运行的设置不应受到此操作的影响。

在阅读了互联网和Stack Overflow之后,我现在了解了如何创建处理的Pool,如何将进程与处理器核心相关联,如何启动进程池,以及排队/加入它们(我不需要后者(。

我不知道的是如何独立管理它们。如何在不影响其他进程执行的情况下分别启动/停止不同进程?是否有库来管理它们(甚至可能使用GUI(?

我可能会这样做:

import random
import time
from multiprocessing import Process, Queue

class MotorProcess:
def __init__(self, name, com_related_params):
self.name = name
# Made up some parameters relating to communication
self._params = com_related_params
self._command_queue = Queue()
self._status_queue = Queue()
self._process = None
def start(self):
if self._process and self._process.is_alive():
return
self._process = Process(target=self.run_processing,
args=(self._command_queue, self._status_queue,
self._params))
self._process.start()
@staticmethod
def run_processing(command_queue, status_queue, params):
while True:
# Check for commands
if not command_queue.empty():
msg = command_queue.get(block=True, timeout=0.05)
if msg == "stop motor":
status_queue.put("Stopping motor")
elif msg == "exit":
return
elif msg.startswith("move"):
status_queue.put("moving motor to blah")
# TODO: msg parsing and move motor
else:
status_queue.put("unknown command")
# Update status
# TODO: query motor status
status_queue.put(f"Motor is {random.randint(0, 100)}")
time.sleep(0.5)
def is_alive(self):
if self._process and self._process.is_alive():
return True
return False
def get_status(self):
if not self.is_alive():
return ["not running"]
# Empty the queue
recent = []
while not self._status_queue.empty():
recent.append(self._status_queue.get(False))
return recent
def stop_process(self):
if not self.is_alive():
return
self._command_queue.put("exit")
# Empty the stats queue otherwise it could potentially stop
# the process from closing.
while not self._status_queue.empty():
self._status_queue.get()
self._process.join()
def send_command(self, command):
self._command_queue.put(command)

if __name__ == "__main__":
processes = [MotorProcess("1", None), MotorProcess("2", None)]
while True:
cmd = input()
if cmd == "start 1":
processes[0].start()
elif cmd == "move 1 to 100":
processes[0].send_command("move to 100")
elif cmd == "exit 1":
processes[0].stop_process()
else:
for n, p in enumerate(processes):
print(f"motor {n + 1}", end="nt")
print("nt".join(p.get_status()))

未做好生产准备(例如,没有异常处理,没有正确的命令解析等(,但表明了这一想法。如果有任何问题就大喊:D

您可以手动创建多个multriprocessing.Process实例,如下所示:

def my_func(a, b):
pass
p = multiprocessing.Process(target=my_func, args=(100, 200)
p.start()

并使用多处理原语QueueEventCondition等进行管理。有关详细信息,请参阅官方文档:https://docs.python.org/3/library/multiprocessing.html

在以下示例中,多个进程分别启动和停止。Event用于确定何时停止进程。Queue用于从子进程传递到主进程的结果。

import multiprocessing
import queue
import random
import time

def worker_process(
process_id: int,
results_queue: multiprocessing.Queue,
to_stop: multiprocessing.Event,
):
print(f"Process {process_id} is started")
while not to_stop.is_set():
print(f"Process {process_id} is working")
time.sleep(0.5)
result = random.random()
results_queue.put((process_id, result))
print(f"Process {process_id} exited")

process_pool = []
result_queue = multiprocessing.Queue()
while True:
if random.random() < 0.3:
# staring a new process
process_id = random.randint(0, 10_000)
to_stop = multiprocessing.Event()
p = multiprocessing.Process(
target=worker_process, args=(process_id, result_queue, to_stop)
)
p.start()
process_pool.append((p, to_stop))
if random.random() < 0.2:
# closing a random process
if process_pool:
process, to_stop = process_pool.pop(
random.randint(0, len(process_pool) - 1)
)
to_stop.set()
process.join()
try:
p_id, result = result_queue.get_nowait()
print(f"Completed: process_id={p_id} result={result}")
except queue.Empty:
pass
time.sleep(1)

最新更新