我想直接一个python脚本的子进程' stdout和stdin到同一个文件。我不知道的是如何区分两种来源的线条?(例如,在来自stderr的行前加上感叹号。)
在我的特殊情况下,不需要实时监视子进程,执行Python脚本可以等待其执行结束。
tsk = subprocess.Popen(args,stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
subprocess.STDOUT
是一个特殊的标志,它告诉子进程将所有标准错误输出路由到标准输出,从而合并两个流。
顺便说一句,select在windows中没有poll()。子进程只使用文件句柄号,不调用文件输出对象的write方法。
捕获输出,执行如下操作:
logfile = open(logfilename, 'w')
while tsk.poll() is None:
line = tsk.stdout.readline()
logfile.write(line)
我发现自己最近不得不解决这个问题,并且花了一段时间才得到我认为在大多数情况下正确工作的东西,所以这里是!(它还有一个很好的副作用,通过python记录器处理输出,我注意到这是Stackoverflow上的另一个常见问题)。
代码如下:
import sys
import logging
import subprocess
from threading import Thread
logging.basicConfig(stream=sys.stdout,level=logging.INFO)
logging.addLevelName(logging.INFO+2,'STDERR')
logging.addLevelName(logging.INFO+1,'STDOUT')
logger = logging.getLogger('root')
pobj = subprocess.Popen(['python','-c','print 42;bargle'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def logstream(stream,loggercb):
while True:
out = stream.readline()
if out:
loggercb(out.rstrip())
else:
break
stdout_thread = Thread(target=logstream,
args=(pobj.stdout,lambda s: logger.log(logging.INFO+1,s)))
stderr_thread = Thread(target=logstream,
args=(pobj.stderr,lambda s: logger.log(logging.INFO+2,s)))
stdout_thread.start()
stderr_thread.start()
while stdout_thread.isAlive() and stderr_thread.isAlive():
pass
输出如下:
STDOUT:root:42
STDERR:root:Traceback (most recent call last):
STDERR:root: File "<string>", line 1, in <module>
STDERR:root:NameError: name 'bargle' is not defined
您可以替换子进程调用来做任何您想做的事情,我只是选择用我知道会打印到标准输出和标准错误的命令运行python。关键的一点是在一个单独的线程中读取标准输入和标准输出。否则你可能会阻塞在读取一个,而有数据准备读取另一个。
如果您想要交错以获得与交互式运行进程大致相同的顺序,那么您需要做shell所做的并轮询stdin/stdout并按照它们轮询的顺序写入。
这里有一些代码,可以按照您的需要做一些事情-在本例中,将stdout/stderr发送到记录器info/error流。
tsk = subprocess.Popen(args,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
poll = select.poll()
poll.register(tsk.stdout,select.POLLIN | select.POLLHUP)
poll.register(tsk.stderr,select.POLLIN | select.POLLHUP)
pollc = 2
events = poll.poll()
while pollc > 0 and len(events) > 0:
for event in events:
(rfd,event) = event
if event & select.POLLIN:
if rfd == tsk.stdout.fileno():
line = tsk.stdout.readline()
if len(line) > 0:
logger.info(line[:-1])
if rfd == tsk.stderr.fileno():
line = tsk.stderr.readline()
if len(line) > 0:
logger.error(line[:-1])
if event & select.POLLHUP:
poll.unregister(rfd)
pollc = pollc - 1
if pollc > 0: events = poll.poll()
tsk.wait()
目前,如果子进程不是接受-u
标志的Python脚本,则所有其他答案都不处理子进程端的缓冲。请参阅pexpect文档中的"Q:为什么不直接使用管道(popen())?"
为了模拟一些基于C语言的程序(FILE*
)的-u
标志,可以尝试stdbuf
。
如果你忽略这一点,那么你的输出将不会正确交错,可能看起来像:
stderr
stderr
...large block of stdout including parts that are printed before stderr...
您可以用下面的客户机程序尝试它,注意使用/不使用-u
标志的差异(['stdbuf', '-o', 'L', 'child_program']
也修复了输出):
#!/usr/bin/env python
from __future__ import print_function
import random
import sys
import time
from datetime import datetime
def tprint(msg, file=sys.stdout):
time.sleep(.1*random.random())
print("%s %s" % (datetime.utcnow().strftime('%S.%f'), msg), file=file)
tprint("stdout1 before stderr")
tprint("stdout2 before stderr")
for x in range(5):
tprint('stderr%d' % x, file=sys.stderr)
tprint("stdout3 after stderr")
在Linux上,你可以使用pty
来获得与子进程交互运行时相同的行为,例如,这里有一个修改的@T。Rojan的回答是:
import logging, os, select, subprocess, sys, pty
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
master_fd, slave_fd = pty.openpty()
p = subprocess.Popen(args,stdout=slave_fd, stderr=subprocess.PIPE, close_fds=True)
with os.fdopen(master_fd) as stdout:
poll = select.poll()
poll.register(stdout, select.POLLIN)
poll.register(p.stderr,select.POLLIN | select.POLLHUP)
def cleanup(_done=[]):
if _done: return
_done.append(1)
poll.unregister(p.stderr)
p.stderr.close()
poll.unregister(stdout)
assert p.poll() is not None
read_write = {stdout.fileno(): (stdout.readline, logger.info),
p.stderr.fileno(): (p.stderr.readline, logger.error)}
while True:
events = poll.poll(40) # poll with a small timeout to avoid both
# blocking forever and a busy loop
if not events and p.poll() is not None:
# no IO events and the subprocess exited
cleanup()
break
for fd, event in events:
if event & select.POLLIN: # there is something to read
read, write = read_write[fd]
line = read()
if line:
write(line.rstrip())
elif event & select.POLLHUP: # free resources if stderr hung up
cleanup()
else: # something unexpected happened
assert 0
sys.exit(p.wait()) # return child's exit code
它假设在交互模式下stderr总是未缓冲的/行缓冲的,stdout总是行缓冲的。只读取整行。如果输出中有未终止的行,程序可能会阻塞。
我建议您编写自己的处理程序,类似于(未经过测试,我希望您能抓住这个想法):
class my_buffer(object):
def __init__(self, fileobject, prefix):
self._fileobject = fileobject
self.prefix = prefix
def write(self, text):
return self._fileobject.write('%s %s' % (self.prefix, text))
# delegate other methods to fileobject if necessary
log_file = open('log.log', 'w')
my_out = my_buffer(log_file, 'OK:')
my_err = my_buffer(log_file, '!!!ERROR:')
p = subprocess.Popen(command, stdout=my_out, stderr=my_err, shell=True)
改进T.Rojan的代码,使其在stderr或stdout接收超过一行的内容时能够工作。
# Use subprocess.Popen to run the code in the temporary file and capture stdout and stderr
process = subprocess.Popen([sys.executable, temp_file.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Capture the output while the process is running by polling the stdout and stderr pipes and reading from them
poll = select.poll()
poll.register(process.stdout,select.POLLIN | select.POLLHUP)
poll.register(process.stderr,select.POLLIN | select.POLLHUP)
pollc = 2
events = poll.poll()
while pollc > 0 and len(events) > 0:
for event in events:
(rfd, event) = event
if event & select.POLLIN:
if rfd == process.stdout.fileno():
while True:
line = process.stdout.readline()
if len(line) == 0:
break
# We don't want to print the newline character at the end of the line so we slice it off
logger.info(line[:-1].decode('utf-8'))
if rfd == process.stderr.fileno():
while True:
line = process.stderr.readline()
if len(line) == 0:
break
logger.error(line[:-1].decode('utf-8'))
if event & select.POLLHUP:
poll.unregister(rfd)
pollc = pollc - 1
if pollc > 0:
events = poll.poll()
process.wait()
然而,我做了这些类,这些类在我看来要好得多,但超出了这个问题的范围。您可能想要编辑DEBUG:
code_executor.py
import logging, os, select, subprocess, sys, tempfile, pty
from colorama import Fore
from definitions import DEBUG
from typing import Dict, Optional, Any, List, Tuple
import TimeoutHandler
import FirstInFirstOutIO
class CodeExecutor:
# If live_output is True, the output of the code will be printed to stdout as it is generated.
# If live_output is True or False you will still always have the full output string retuned in the Tuple along with the success boolean
# max_output_size is the maximum size of the output string. Helpful to prevent excessive memory usage, and to prevent the output from being too large to send to OpenAI
# timeout_seconds is the maximum number of seconds the code is allowed to run before it is terminated. TODO support Windows by using threading instead of signal.alarm
def execute_code(self, code: str, live_output: bool= True, max_output_size: int = 1000, timeout_seconds: int = 10) -> Tuple[bool, str]:
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Setup the handler with a FirstInFirstOutIO object
log_capture_string = FirstInFirstOutIO(max_output_size)
handler = logging.StreamHandler(log_capture_string)
logger.addHandler(handler)
success = True
# Create a temporary file to store the provided code
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.py') as temp_file:
temp_file.write(code)
temp_file.flush()
try:
with TimeoutHandler(timeout_seconds):
master, slave = pty.openpty()
# Use subprocess.Popen to run the code in the temporary file and capture stdout and stderr
process = subprocess.Popen([sys.executable, temp_file.name], stdout=slave, stderr=slave, universal_newlines=True)
os.close(slave)
timeout = 0.1 # A small timeout value for os.read
while True:
rlist, _, _ = select.select([master], [], [], timeout)
if rlist:
data = os.read(master, 1024).decode('utf-8')
if not data:
break
for line in data.splitlines():
if live_output:
print(line)
logger.info(line)
if not process.poll() is None:
break
except TimeoutError:
process.kill()
# Handle timeout errors by appending a timeout error message to the logger and setting success to false
message=f"Provided code took too long to finish execution. TimeoutError: Timeout after {timeout_seconds} seconds."
logger.error(message)
if live_output:
print(message)
success = False
except subprocess.CalledProcessError as e:
# Handle errors in the subprocess by appending the error message to the logger and setting success to false
message=f"Error executing code: {str(e)}"
logger.error(message)
if live_output:
print(message)
success = False
finally:
# Remove the temporary file after execution
os.remove(temp_file.name)
output_string = log_capture_string.getvalue()
log_capture_string.close()
logger.removeHandler(handler) # Just being explicit here
if DEBUG:
print(f"{Fore.YELLOW} Would you like to see the output of the code? (y/n){Fore.RESET}")
if input().lower() == 'y':
print(output_string)
return success, output_string
first_in_first_out_io.py
import io, collections
class FirstInFirstOutIO(io.TextIOBase):
def __init__(self, size, *args):
self.maxsize = size
io.TextIOBase.__init__(self, *args)
self.deque = collections.deque()
def getvalue(self):
return ''.join(self.deque)
def write(self, x):
self.deque.append(x)
self.shrink()
def shrink(self):
if self.maxsize is None:
return
size = sum(len(x) for x in self.deque)
while size > self.maxsize:
x = self.deque.popleft()
size -= len(x)
timeout_handler.py
import signal
import sys
# This is a context manager that will raise a TimeoutError if the code inside
# the context manager takes longer than the given number of seconds
class TimeoutHandler:
def __init__(self, seconds: int):
self.seconds = seconds
def __enter__(self):
if sys.platform == "win32":
# Windows does not support SIGALRM, so skip the timeout handling
return self
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)
return self
def __exit__(self, exc_type, exc_value, traceback):
if sys.platform != "win32":
signal.alarm(0)
def handle_timeout(self, signum, frame):
raise TimeoutError(f"Timeout after {self.seconds} seconds.")
可以在命令执行后将stdout/err写入文件。在下面的例子中,我使用pickle,所以我确信我将能够在没有任何特殊解析的情况下读取以区分stdout/err,并且在某些时候我可以删除exitcode和命令本身。
import subprocess
import cPickle
command = 'ls -altrh'
outfile = 'log.errout'
pipe = subprocess.Popen(command, stdout = subprocess.PIPE,
stderr = subprocess.PIPE, shell = True)
stdout, stderr = pipe.communicate()
f = open(outfile, 'w')
cPickle.dump({'out': stdout, 'err': stderr},f)
f.close()