实时多进程stdout监控



现在,我正在使用subprocess在后台运行一个长时间运行的作业。由于多种原因(PyInstaller+AWS CLI(,我不能再使用子流程了。

有没有一种简单的方法可以实现下面的目标?在多进程池(或其他什么(中运行一个长时间运行的python函数,并实时处理stdout/stderr?

import subprocess
process = subprocess.Popen(
["python", "long-job.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
)
while True:
out = process.stdout.read(2000).decode()
if not out:
err = process.stderr.read().decode()
else:
err = ""
if (out == "" or err == "") and process.poll() is not None:
break
live_stdout_process(out)

感谢

跨平台获取它很麻烦。。。。首先,非阻塞管道的windows实现对用户不友好或不可移植。

一种选择是让应用程序读取其命令行参数并有条件地执行一个文件,然后使用子流程,因为您将使用不同的参数启动自己。

但要保持多处理:

  1. 输出必须记录到队列而不是管道中
  2. 您需要孩子来执行python文件,这可以使用runpy__main__的形式执行该文件
  3. 这个runpy函数应该在多处理子函数下运行,这个子函数必须首先在初始化器中重定向它的stdout和stderr
  4. 当发生错误时,主应用程序必须捕获它。。。。但是,如果它忙于读取输出,它将无法等待错误,因此子线程必须启动多进程并等待错误
  5. 主进程必须创建队列,启动子线程并读取输出

将其整合在一起:

import multiprocessing
from multiprocessing import Queue
import sys
import concurrent.futures
import threading
import traceback
import runpy
import time
class StdoutQueueWrapper:
def __init__(self,queue:Queue):
self._queue = queue
def write(self,text):
self._queue.put(text)
def flush(self):
pass
def function_to_run():
# runpy.run_path("long-job.py",run_name="__main__")  # run long-job.py
print("hello")  # print something
raise ValueError  # error out
def initializer(stdout_queue: Queue,stderr_queue: Queue):
sys.stdout = StdoutQueueWrapper(stdout_queue)
sys.stderr = StdoutQueueWrapper(stderr_queue)
def thread_function(child_stdout_queue,child_stderr_queue):
with concurrent.futures.ProcessPoolExecutor(1, initializer=initializer,
initargs=(child_stdout_queue, child_stderr_queue)) as pool:
result = pool.submit(function_to_run)
try:
result.result()
except Exception as e:
child_stderr_queue.put(traceback.format_exc())

if __name__ == "__main__":
child_stdout_queue = multiprocessing.Queue()
child_stderr_queue = multiprocessing.Queue()
child_thread = threading.Thread(target=thread_function,args=(child_stdout_queue,child_stderr_queue),daemon=True)
child_thread.start()
while True:
while not child_stdout_queue.empty():
var = child_stdout_queue.get()
print(var,end='')
while not child_stderr_queue.empty():
var = child_stderr_queue.get()
print(var,end='')
if not child_thread.is_alive():
break
time.sleep(0.01)  # check output every 0.01 seconds

请注意,作为多进程运行的一个直接后果是,如果子进程遇到分段错误或一些不可恢复的错误,父进程也会死亡,如果预期会出现分段错误,那么在子进程下运行自己似乎是一个更好的选择。

最新更新