同时读取子过程Stdout和STDERR



我正在尝试在python中运行一个冗长的命令,该命令均输出到stdout和stderr。我想轮询子过程并将输出写入分开文件。

我尝试了以下内容,基于此答案,在subprocess.python中的Pipe

中读取了以下答案。
import subprocess
from Queue import Queue, Empty
from threading import Thread
def send_cmd(cmd, shell=False):
    """
    Send cmd to the shell
    """
    if not isinstance(cmd, list): cmd = shlex.split(cmd)
    params = {'args'   : cmd,
              'stdout' : subprocess.PIPE,
              'stderr' : subprocess.PIPE,
              'shell'  : shell}
    proc = subprocess.Popen(**params)
    return proc
def monitor_command(process, stdout_log=os.devnull, stderr_log=os.devnull):
    """
    Monitor the process that is running, and log it if desired
    """
    def enqueue_output(out, queue):
        for line in iter(out.readline, b''):
            queue.put(line)
    def setup_process(log_name, proc):
        FID = open(log_name, 'w')
        queue = Queue()
        thread = Thread(target=enqueue_output, args=(proc, queue))
        thread.daemon = True # Thread dies with program
        thread.start()
        return (queue, FID)
    def check_queues(queue_list, errors):
        for queue, FID in queue_list:
            try:
                line = queue.get_nowait()
                if 'error' in line.lower() or 'failed' in line.lower():
                    errors.append(line)
            except Empty:
                pass
            else:
                FID.write(line)
    errors = []
    queue_list = []
    for log, proc in [(stdout_log, process.stdout), (stderr_log, process.stderr)]:
        queue_list.append(setup_process(log, proc)
    while process.poll() is None:
        check_queues(queue_list, errors)
    while not queue_list[0][0].empty() or queue_list[1][0].empty():
        check_queues(queue_list, errors)
    for queue, FID in queue_list:
        FID.close()
return errors
process = send_cmd('long_program.exe')
errors  = monitor_command(process, stdout_log='stdout.log', stderr_log='stderr.log')

但是它的stdout输出文件是空的,stderr的输出文件只有几行,而两者都应该很大。

我缺少什么?

我做过一次。这是我写的一些旧代码


class Process_Communicator():
    def join(self):
        self.te.join()
        self.to.join()
        self.running = False
        self.aggregator.join()
        self.ti.join()
    def enqueue_in(self):
        while self.running and self.p.stdin is not None:
            while not self.stdin_queue.empty():
                s = self.stdin_queue.get()
                self.p.stdin.write(str(s) + 'nr')
            pass
    def enqueue_output(self):
        if not self.p.stdout or self.p.stdout.closed:
            return
        out = self.p.stdout
        for line in iter(out.readline, b''):
            self.qo.put(line)
        #    out.flush()
    def enqueue_err(self):
        if not self.p.stderr or self.p.stderr.closed:
            return
        err = self.p.stderr
        for line in iter(err.readline, b''):
            self.qe.put(line)
    def aggregate(self):
        while (self.running):
            self.update()
        self.update()
    def update(self):
        line = ""
        try:
            while self.qe.not_empty:
                line = self.qe.get_nowait()  # or q.get(timeout=.1)
                self.unbblocked_err += line
        except Queue.Empty:
            pass
        line = ""
        try:
            while self.qo.not_empty:
                line = self.qo.get_nowait()  # or q.get(timeout=.1)
                self.unbblocked_out += line
        except Queue.Empty:
            pass
        while not self.stdin_queue.empty():
                s = self.stdin_queue.get()
                self.p.stdin.write(str(s))
    def get_stdout(self, clear=True):
        ret = self.unbblocked_out
        if clear:
            self.unbblocked_out = ""
        return ret
    def has_stdout(self):
        ret = self.get_stdout(False)
        if ret == '':
            return None
        else:
            return ret
    def get_stderr(self, clear=True):
        ret = self.unbblocked_out
        if clear:
            self.unbblocked_out = ""
        return ret
    def has_stderr(self):
        ret = self.get_stdout(False)
        if ret == '':
            return None
        else:
            return ret
    def __init__(self, subp):
        '''This is a simple class that collects and aggregates the
        output from a subprocess so that you can more reliably use
        the class without having to block for subprocess.communicate.'''
        self.p = subp
        self.unbblocked_out = ""
        self.unbblocked_err = ""
        self.running = True
        self.qo = Queue.Queue()
        self.to = threading.Thread(name="out_read",
                                    target=self.enqueue_output,
                                    args=())
        self.to.daemon = True  # thread dies with the program
        self.to.start()
        self.qe = Queue.Queue()
        self.te = threading.Thread(name="err_read",
                                   target=self.enqueue_err,
                                   args=())
        self.te.daemon = True  # thread dies with the program
        self.te.start()
        self.stdin_queue = Queue.Queue()
        self.aggregator = threading.Thread(name="aggregate",
                                           target=self.aggregate,
                                           args=())
        self.aggregator.daemon = True  # thread dies with the program
        self.aggregator.start()
        pass
 

您可能不需要整个示例,但是可以随意剪切并粘贴所需的内容。显示我如何进行线程也很重要。

代码看起来比任务所需的要复杂。我不明白为什么您需要在此处致电process.poll()queue.get_nowait()。将子过程的stdout/stderr交付到几个水槽中;您可以从接受任意文件状对象的teed_call()开始:您可以通过logFiles和特殊的类似文件的对象,这些对象会在其.write()方法中累积errors

以最小的更改来修复代码;您应该在读取器线程上调用.join()(即使process.poll() None,即退出的子过程;可能会有一些未决的输出。加入阅读器的线程确保所有输出都可以读取)。

)。 )。

最新更新