合并 Python 脚本的子进程的 stdout 和 stderr,同时保持它们的可区分性



我想直接一个python脚本的子进程' stdout和stdin到同一个文件。我不知道的是如何区分两种来源的线条?(例如,在来自stderr的行前加上感叹号。)

在我的特殊情况下,不需要实时监视子进程,执行Python脚本可以等待其执行结束。

tsk = subprocess.Popen(args,stdout=subprocess.PIPE,stderr=subprocess.STDOUT)

subprocess.STDOUT是一个特殊的标志,它告诉子进程将所有标准错误输出路由到标准输出,从而合并两个流。

顺便说一句,select在windows中没有poll()。子进程只使用文件句柄号,不调用文件输出对象的write方法。

捕获输出,执行如下操作:

logfile = open(logfilename, 'w')
while tsk.poll() is None:
    line = tsk.stdout.readline()
    logfile.write(line)

我发现自己最近不得不解决这个问题,并且花了一段时间才得到我认为在大多数情况下正确工作的东西,所以这里是!(它还有一个很好的副作用,通过python记录器处理输出,我注意到这是Stackoverflow上的另一个常见问题)。

代码如下:

import sys
import logging
import subprocess
from threading import Thread
logging.basicConfig(stream=sys.stdout,level=logging.INFO)
logging.addLevelName(logging.INFO+2,'STDERR')
logging.addLevelName(logging.INFO+1,'STDOUT')
logger = logging.getLogger('root')
pobj = subprocess.Popen(['python','-c','print 42;bargle'], 
    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def logstream(stream,loggercb):
    while True:
        out = stream.readline()
        if out:
            loggercb(out.rstrip())       
        else:
            break
stdout_thread = Thread(target=logstream,
    args=(pobj.stdout,lambda s: logger.log(logging.INFO+1,s)))
stderr_thread = Thread(target=logstream,
    args=(pobj.stderr,lambda s: logger.log(logging.INFO+2,s)))
stdout_thread.start()
stderr_thread.start()
while stdout_thread.isAlive() and stderr_thread.isAlive():
     pass

输出如下:

STDOUT:root:42
STDERR:root:Traceback (most recent call last):
STDERR:root:  File "<string>", line 1, in <module>
STDERR:root:NameError: name 'bargle' is not defined

您可以替换子进程调用来做任何您想做的事情,我只是选择用我知道会打印到标准输出和标准错误的命令运行python。关键的一点是在一个单独的线程中读取标准输入和标准输出。否则你可能会阻塞在读取一个,而有数据准备读取另一个。

如果您想要交错以获得与交互式运行进程大致相同的顺序,那么您需要做shell所做的并轮询stdin/stdout并按照它们轮询的顺序写入。

这里有一些代码,可以按照您的需要做一些事情-在本例中,将stdout/stderr发送到记录器info/error流。

tsk = subprocess.Popen(args,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
poll = select.poll()
poll.register(tsk.stdout,select.POLLIN | select.POLLHUP)
poll.register(tsk.stderr,select.POLLIN | select.POLLHUP)
pollc = 2
events = poll.poll()
while pollc > 0 and len(events) > 0:
  for event in events:
    (rfd,event) = event
    if event & select.POLLIN:
      if rfd == tsk.stdout.fileno():
        line = tsk.stdout.readline()
        if len(line) > 0:
          logger.info(line[:-1])
      if rfd == tsk.stderr.fileno():
        line = tsk.stderr.readline()
        if len(line) > 0:
          logger.error(line[:-1])
    if event & select.POLLHUP:
      poll.unregister(rfd)
      pollc = pollc - 1
    if pollc > 0: events = poll.poll()
tsk.wait()

目前,如果子进程不是接受-u标志的Python脚本,则所有其他答案都不处理子进程端的缓冲。请参阅pexpect文档中的"Q:为什么不直接使用管道(popen())?"

为了模拟一些基于C语言的程序(FILE*)的-u标志,可以尝试stdbuf

如果你忽略这一点,那么你的输出将不会正确交错,可能看起来像:

stderr
stderr
...large block of stdout including parts that are printed before stderr...

您可以用下面的客户机程序尝试它,注意使用/不使用-u标志的差异(['stdbuf', '-o', 'L', 'child_program']也修复了输出):

#!/usr/bin/env python
from __future__ import print_function
import random
import sys
import time
from datetime import datetime
def tprint(msg, file=sys.stdout):
    time.sleep(.1*random.random())
    print("%s %s" % (datetime.utcnow().strftime('%S.%f'), msg), file=file)
tprint("stdout1 before stderr")
tprint("stdout2 before stderr")
for x in range(5):
    tprint('stderr%d' % x, file=sys.stderr)
tprint("stdout3 after stderr")

在Linux上,你可以使用pty来获得与子进程交互运行时相同的行为,例如,这里有一个修改的@T。Rojan的回答是:

import logging, os, select, subprocess, sys, pty
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
master_fd, slave_fd = pty.openpty()
p = subprocess.Popen(args,stdout=slave_fd, stderr=subprocess.PIPE, close_fds=True)
with os.fdopen(master_fd) as stdout:
    poll = select.poll()
    poll.register(stdout, select.POLLIN)
    poll.register(p.stderr,select.POLLIN | select.POLLHUP)
    def cleanup(_done=[]):
        if _done: return
        _done.append(1)
        poll.unregister(p.stderr)
        p.stderr.close()
        poll.unregister(stdout)
        assert p.poll() is not None
    read_write = {stdout.fileno(): (stdout.readline, logger.info),
                  p.stderr.fileno(): (p.stderr.readline, logger.error)}
    while True:
        events = poll.poll(40) # poll with a small timeout to avoid both
                               # blocking forever and a busy loop
        if not events and p.poll() is not None:
            # no IO events and the subprocess exited
            cleanup()
            break
        for fd, event in events:
            if event & select.POLLIN: # there is something to read
                read, write = read_write[fd]
                line = read()
                if line:
                    write(line.rstrip())
            elif event & select.POLLHUP: # free resources if stderr hung up
                cleanup()
            else: # something unexpected happened
                assert 0
sys.exit(p.wait()) # return child's exit code

它假设在交互模式下stderr总是未缓冲的/行缓冲的,stdout总是行缓冲的。只读取整行。如果输出中有未终止的行,程序可能会阻塞。

我建议您编写自己的处理程序,类似于(未经过测试,我希望您能抓住这个想法):

class my_buffer(object):
    def __init__(self, fileobject, prefix):
        self._fileobject = fileobject
        self.prefix = prefix
    def write(self, text):
        return self._fileobject.write('%s %s' % (self.prefix, text))
    # delegate other methods to fileobject if necessary
log_file = open('log.log', 'w')
my_out = my_buffer(log_file, 'OK:')
my_err = my_buffer(log_file, '!!!ERROR:')
p = subprocess.Popen(command, stdout=my_out, stderr=my_err, shell=True)

改进T.Rojan的代码,使其在stderr或stdout接收超过一行的内容时能够工作。

# Use subprocess.Popen to run the code in the temporary file and capture stdout and stderr
process = subprocess.Popen([sys.executable, temp_file.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Capture the output while the process is running by polling the stdout and stderr pipes and reading from them
poll = select.poll()
poll.register(process.stdout,select.POLLIN | select.POLLHUP)
poll.register(process.stderr,select.POLLIN | select.POLLHUP)
pollc = 2
events = poll.poll()
while pollc > 0 and len(events) > 0:
  for event in events:
    (rfd, event) = event
    if event & select.POLLIN:
      if rfd == process.stdout.fileno():
        while True:
          line = process.stdout.readline()
          if len(line) == 0:
            break
          # We don't want to print the newline character at the end of the line so we slice it off
          logger.info(line[:-1].decode('utf-8'))
      if rfd == process.stderr.fileno():
        while True:
          line = process.stderr.readline()
          if len(line) == 0:
            break
          logger.error(line[:-1].decode('utf-8'))
    if event & select.POLLHUP:
      poll.unregister(rfd)
      pollc = pollc - 1
  if pollc > 0:
    events = poll.poll()
process.wait()

然而,我做了这些类,这些类在我看来要好得多,但超出了这个问题的范围。您可能想要编辑DEBUG:

code_executor.py

import logging, os, select, subprocess, sys, tempfile, pty
from colorama import Fore
from definitions import DEBUG
from typing import Dict, Optional, Any, List, Tuple
import TimeoutHandler
import FirstInFirstOutIO
class CodeExecutor:
  # If live_output is True, the output of the code will be printed to stdout as it is generated.
  # If live_output is True or False you will still always have the full output string retuned in the Tuple along with the success boolean
  # max_output_size is the maximum size of the output string. Helpful to prevent excessive memory usage, and to prevent the output from being too large to send to OpenAI
  # timeout_seconds is the maximum number of seconds the code is allowed to run before it is terminated. TODO support Windows by using threading instead of signal.alarm
  def execute_code(self, code: str, live_output: bool= True, max_output_size: int = 1000, timeout_seconds: int = 10) -> Tuple[bool, str]:
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    # Setup the handler with a FirstInFirstOutIO object
    log_capture_string = FirstInFirstOutIO(max_output_size)
    handler = logging.StreamHandler(log_capture_string)
    logger.addHandler(handler)
    success = True
    # Create a temporary file to store the provided code
    with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.py') as temp_file:
      temp_file.write(code)
      temp_file.flush()
    try:
      with TimeoutHandler(timeout_seconds):
        master, slave = pty.openpty()
        # Use subprocess.Popen to run the code in the temporary file and capture stdout and stderr
        process = subprocess.Popen([sys.executable, temp_file.name], stdout=slave, stderr=slave, universal_newlines=True)
        os.close(slave)
        timeout = 0.1  # A small timeout value for os.read
        while True:
          rlist, _, _ = select.select([master], [], [], timeout)
          if rlist:
            data = os.read(master, 1024).decode('utf-8')
            if not data:
              break
            for line in data.splitlines():
              if live_output:
                print(line)
              logger.info(line)
          if not process.poll() is None:
            break
    except TimeoutError:
      process.kill()
      # Handle timeout errors by appending a timeout error message to the logger and setting success to false
      message=f"Provided code took too long to finish execution. TimeoutError: Timeout after {timeout_seconds} seconds."
      logger.error(message)
      if live_output:
        print(message)
      success = False
    except subprocess.CalledProcessError as e:
      # Handle errors in the subprocess by appending the error message to the logger and setting success to false
      message=f"Error executing code: {str(e)}"
      logger.error(message)
      if live_output:
        print(message)
      success = False
    finally:
      # Remove the temporary file after execution
      os.remove(temp_file.name)
      output_string = log_capture_string.getvalue()
      log_capture_string.close()
      logger.removeHandler(handler) # Just being explicit here
      if DEBUG:
        print(f"{Fore.YELLOW} Would you like to see the output of the code? (y/n){Fore.RESET}")
        if input().lower() == 'y':
          print(output_string)
      return success, output_string

first_in_first_out_io.py

import io, collections
class FirstInFirstOutIO(io.TextIOBase):
    def __init__(self, size, *args):
        self.maxsize = size
        io.TextIOBase.__init__(self, *args)
        self.deque = collections.deque()
    def getvalue(self):
        return ''.join(self.deque)
    def write(self, x):
        self.deque.append(x)
        self.shrink()
    def shrink(self):
        if self.maxsize is None:
            return
        size = sum(len(x) for x in self.deque)
        while size > self.maxsize:
            x = self.deque.popleft()
            size -= len(x)

timeout_handler.py

import signal
import sys
# This is a context manager that will raise a TimeoutError if the code inside 
# the context manager takes longer than the given number of seconds
class TimeoutHandler:
  def __init__(self, seconds: int):
    self.seconds = seconds
  def __enter__(self):
    if sys.platform == "win32":
      # Windows does not support SIGALRM, so skip the timeout handling
      return self
    signal.signal(signal.SIGALRM, self.handle_timeout)
    signal.alarm(self.seconds)
    return self
  def __exit__(self, exc_type, exc_value, traceback):
    if sys.platform != "win32":
      signal.alarm(0)
  def handle_timeout(self, signum, frame):
    raise TimeoutError(f"Timeout after {self.seconds} seconds.")

可以在命令执行后将stdout/err写入文件。在下面的例子中,我使用pickle,所以我确信我将能够在没有任何特殊解析的情况下读取以区分stdout/err,并且在某些时候我可以删除exitcode和命令本身。

import subprocess
import cPickle
command = 'ls -altrh'
outfile = 'log.errout'
pipe = subprocess.Popen(command, stdout = subprocess.PIPE,
                        stderr = subprocess.PIPE, shell = True)
stdout, stderr = pipe.communicate()
f = open(outfile, 'w')
cPickle.dump({'out': stdout, 'err': stderr},f)
f.close()

最新更新