子进程命令的实时输出



我使用python脚本作为流体力学代码的驱动程序。当需要运行模拟时,我使用subprocess.Popen来运行代码,将stdoutstderr的输出收集到subprocess.PIPE中——然后我可以打印(并保存到日志文件中)输出信息,并检查任何错误。问题是,我不知道代码进展如何。如果我直接从命令行运行它,它会输出当前的迭代次数、时间、下一个时间步长等信息。

是否有一种方法既存储输出(用于日志记录和错误检查),又产生实时流输出?

我的代码的相关部分:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
print "RUN failednn%snn" % (errors)
success = False
if( errors ): log_file.write("nn%snn" % errors)

最初我是通过管道将run_command通过tee,以便副本直接进入日志文件,并且流仍然直接输出到终端-但是这样我就不能存储任何错误(据我所知)。


目前为止我的临时解决方案:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
log_file.flush()

然后,在另一个终端运行tail -f log.txt(s.tlog_file = 'log.txt')。

TLDR for Python 3:

import subprocess
import sys
with open("test.log", "wb") as f:
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for c in iter(lambda: process.stdout.read(1), b""):
sys.stdout.buffer.write(c)
f.buffer.write(c)

您有两种方法可以做到这一点,要么通过从readreadline函数创建一个迭代器,然后执行:

import subprocess
import sys
# replace "w" with "wb" for Python 3
with open("test.log", "w") as f:
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
# replace "" with b'' for Python 3
for c in iter(lambda: process.stdout.read(1), ""):
sys.stdout.write(c)
f.write(c)

import subprocess
import sys
# replace "w" with "wb" for Python 3
with open("test.log", "w") as f:
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
# replace "" with b"" for Python 3
for line in iter(process.stdout.readline, ""):
sys.stdout.write(line)
f.write(line)

也可以创建一个reader和一个writer文件。将writer传递给Popen,并从reader中读取

import io
import time
import subprocess
import sys
filename = "test.log"
with io.open(filename, "wb") as writer, io.open(filename, "rb", 1) as reader:
process = subprocess.Popen(command, stdout=writer)
while process.poll() is None:
sys.stdout.write(reader.read())
time.sleep(0.5)
# Read the remaining
sys.stdout.write(reader.read())

这样,您将在test.log和标准输出中写入数据。

文件方法的唯一优点是您的代码不会阻塞。因此,您可以在此期间做任何您想做的事情,并以非阻塞的方式随时从reader读取。当您使用PIPE时,readreadline函数将阻塞,直到分别向管道中写入一个字符或向管道中写入一行。

执行摘要(或"tl;版本):当最多有一个subprocess.PIPE时很容易,否则很难。

也许是时候解释一下subprocess.Popen是如何工作的了。

(注意:这是针对Python 2的。X,虽然3。X是相似的;我对Windows的版本也不太清楚。我更了解POSIX的东西。)

Popen函数需要同时处理0到3个I/O流。如往常一样,分别表示为stdinstdoutstderr

您可以提供:

  • None,表示您不想重定向流。它将像往常一样继承这些。请注意,至少在POSIX系统上,这并不意味着它将使用Python的sys.stdout,而只是Python的实际的标准输出;
  • int值。这是一个"生"字。文件描述符(至少在POSIX中)。(旁注:PIPESTDOUT实际上是ints内部,但"不可能";-1和-2)
  • 一个流——实际上,任何具有fileno方法的对象。Popen将使用stream.fileno()找到该流的描述符,然后继续使用int值。
  • subprocess.PIPE,表示Python应该创建一个管道。
  • subprocess.STDOUT(仅适用于stderr):告诉Python使用与stdout相同的描述符。这只有在为stdout提供(非None)值时才有意义,即使这样,只有在设置stdout=subprocess.PIPE时才需要。(否则,您可以提供与stdout相同的参数,例如Popen(..., stdout=stream, stderr=stream))

最简单的情况(没有管道)

如果您不重定向任何内容(将所有三个重定向值保留为默认的None值或提供显式的None),则Pipe非常容易。它只需要剥离子进程并让它运行。或者,如果您重定向到非PIPE-int或流的fileno()-这仍然很容易,因为操作系统会完成所有的工作。Python只需要剥离子进程,将其stdin、stdout和/或stderr连接到所提供的文件描述符。

仍然简单的情况:一个管道

如果你只重定向一个流,Pipe仍然很容易。让我们一次选择一个流并观看。

假设您想提供一些stdin,但让stdoutstderr不重定向,或者转到文件描述符。作为父进程,Python程序只需要使用write()将数据发送到管道中。你可以自己做,例如:

proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
proc.stdin.write('here, have some datan') # etc

或者您可以将stdin数据传递给proc.communicate(),然后stdin.write执行上面所示的操作。没有输出返回,所以communicate()只有一个真正的工作:它也为您关闭管道。(如果您不调用proc.communicate(),则必须调用proc.stdin.close()来关闭管道,以便子进程知道没有更多的数据通过)

假设您想捕获stdout,但保留stdinstderr单独。同样,这很简单:只需调用proc.stdout.read()(或等效函数),直到没有更多的输出。由于proc.stdout()是一个普通的Python I/O流,您可以在其上使用所有普通结构,例如:

for line in proc.stdout:

或者,同样,您可以使用proc.communicate(),它只是为您做read()

如果您只想捕获stderr,它的工作原理与stdout相同。

在事情变得困难之前还有一个技巧。假设您想要捕获stdout,并捕获stderr,但在与stdout相同的管道上:

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

在本例中,subprocess"cheats"!好吧,它必须这样做,所以它并不是真正的作弊:它启动子进程时,它的标准输出和标准错误都指向(单个)管道描述符,该描述符将反馈给其父进程(Python)。在父端,同样只有一个用于读取输出的管道描述符。所有的"标准"输出显示在proc.stdout中,如果调用proc.communicate(),则stderr结果(元组中的第二个值)将是None,而不是字符串。

硬例:两根及以上管道

当你想使用至少两个管道时,问题就会出现。事实上,subprocess代码本身就有这个位:

def communicate(self, input=None):
...
# Optimization: If we are only using one pipe, or no pipe at
# all, using select() or threads is unnecessary.
if [self.stdin, self.stdout, self.stderr].count(None) >= 2:

但是,唉,这里我们已经创建了至少两个,也许是三个不同的管道,因此count(None)返回1或0。我们必须用艰难的方法来做事情。

在Windows上,它使用threading.Thread来累积self.stdoutself.stderr的结果,并让父线程传递self.stdin的输入数据(然后关闭管道)。

在POSIX上,如果可用,则使用poll,否则使用select,来累积输出并传递stdin输入。所有这些都在(单)父进程/线程中运行。

这里需要线程或poll/select来避免死锁。例如,假设我们将所有三个流重定向到三个独立的管道。进一步假设在写进程挂起之前,在等待读进程"清理"之前,可以将多少数据塞到管道中有一个小限制;管子的另一端。为了便于说明,我们将这个小限制设置为单个字节。(这实际上是事情的工作原理,除了限制远远大于一个字节。)

如果父进程(Python)试图写入几个字节——比如,'gon'proc.stdin,第一个字节进入,然后第二个字节导致Python进程挂起,等待子进程读取第一个字节,清空管道。

同时,假设子进程决定打印一个友好的"Hello!别慌!"的问候。H进入其标准输出管道,但e使其挂起,等待其父进程读取该H,清空标准输出管道。

现在我们被卡住了:Python进程处于休眠状态,等待说完&;go&;,子进程也处于休眠状态,等待说完&;Hello!别慌!"。

subprocess.Popen代码通过线程或选择/轮询避免了这个问题。当字节可以通过管道时,它们就会通过。当它们不能休眠时,只有一个线程(而不是整个进程)必须休眠——或者,在select/poll的情况下,Python进程同时等待"can write";或者"数据可用",只有当有空间时才写入进程的标准输入,只有当数据准备好时才读取其标准输出和/或标准错误。proc.communicate()代码(实际上是处理复杂情况的_communicate代码)在发送了所有标准输入数据(如果有的话)并且积累了所有标准输出和/或标准错误数据后返回。

如果你想在两个不同的管道上同时读取stdoutstderr(不管stdin重定向),你也需要避免死锁。这里的死锁场景是不同的—当您从stdout提取数据时,子进程向stderr写入一些很长的内容,或者反之亦然—但是死锁仍然存在。

演示

我承诺会演示,在未重定向的情况下,Pythonsubprocess会写入底层标准输出,而不是sys.stdout。下面是一些代码:

from cStringIO import StringIO
import os
import subprocess
import sys
def show1():
print 'start show1'
save = sys.stdout
sys.stdout = StringIO()
print 'sys.stdout being buffered'
proc = subprocess.Popen(['echo', 'hello'])
proc.wait()
in_stdout = sys.stdout.getvalue()
sys.stdout = save
print 'in buffer:', in_stdout
def show2():
print 'start show2'
save = sys.stdout
sys.stdout = open(os.devnull, 'w')
print 'after redirect sys.stdout'
proc = subprocess.Popen(['echo', 'hello'])
proc.wait()
sys.stdout = save
show1()
show2()

运行时:

$ python out.py
start show1
hello
in buffer: sys.stdout being buffered
start show2
hello

注意,如果添加stdout=sys.stdout,第一个例程将失败,因为StringIO对象没有fileno。如果您添加stdout=sys.stdout,则第二个将忽略hello,因为sys.stdout已重定向到os.devnull

(如果重定向Python的file-descriptor-1,子进程将遵循该重定向。open(os.devnull, 'w')调用产生一个fileno()大于2的流。)

我们也可以使用默认的文件迭代器来读取stdout,而不是在readline()中使用iter结构。

import subprocess
import sys
process = subprocess.Popen(
your_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
for line in process.stdout:
sys.stdout.write(line)

除了以上这些答案,还有一种简单的方法:

process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
while process.stdout.readable():
line = process.stdout.readline()
if not line:
break
print(line.strip())

循环遍历可读流,只要它是可读的,如果得到一个空结果,停止。

这里的关键是,只要有输出,readline()返回一行(n在末尾),如果它真的在末尾,则为空。

希望这能帮助到一些人。

如果您能够使用第三方库,您可能可以使用类似sarge的东西(披露:我是它的维护者)。这个库允许对子进程的输出流进行非阻塞访问——它是在subprocess模块上分层的。

如果您所需要的只是输出将在控制台上可见,那么对我来说最简单的解决方案是将以下参数传递给Popen

with Popen(cmd, stdout=sys.stdout, stderr=sys.stderr) as proc:

,它将使用你的python脚本和文件句柄

解决方案1:实时同时记录stdoutstderr

一个简单的解决方案,同时记录标准输出和标准错误,在实时逐行记录到日志文件中。

import subprocess as sp
from concurrent.futures import ThreadPoolExecutor

def log_popen_pipe(p, stdfile):
with open("mylog.txt", "w") as f:
while p.poll() is None:
f.write(stdfile.readline())
f.flush()
# Write the rest from the buffer
f.write(stdfile.read())

with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:
with ThreadPoolExecutor(2) as pool:
r1 = pool.submit(log_popen_pipe, p, p.stdout)
r2 = pool.submit(log_popen_pipe, p, p.stderr)
r1.result()
r2.result()

解决方案2:read_popen_pipes()函数允许您在两个管道(stdout/stderr)上同时迭代,实时

import subprocess as sp
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor

def enqueue_output(file, queue):
for line in iter(file.readline, ''):
queue.put(line)
file.close()

def read_popen_pipes(p):
with ThreadPoolExecutor(2) as pool:
q_stdout, q_stderr = Queue(), Queue()
pool.submit(enqueue_output, p.stdout, q_stdout)
pool.submit(enqueue_output, p.stderr, q_stderr)
while True:
if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
break
out_line = err_line = ''
try:
out_line = q_stdout.get_nowait()
err_line = q_stderr.get_nowait()
except Empty:
pass
yield (out_line, err_line)
# The function in use:
with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:
for out_line, err_line in read_popen_pipes(p):
print(out_line, end='')
print(err_line, end='')
p.poll()

与之前的答案类似,但以下解决方案适用于我在windows上使用Python3提供实时打印和登录的通用方法(来源)

def print_and_log(command, logFile):
with open(logFile, 'wb') as f:
command = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
while True:
output = command.stdout.readline()
if not output and command.poll() is not None:
f.close()
break
if output:
f.write(output)
print(str(output.strip(), 'utf-8'), flush=True)
return command.poll()

一个好的但"重量级"的解决方案是使用Twisted -见底部。

如果你愿意只使用stdout,那么这些行应该可以工作:

import subprocess
import sys
popenobj = subprocess.Popen(["ls", "-Rl"], stdout=subprocess.PIPE)
while not popenobj.poll():
stdoutdata = popenobj.stdout.readline()
if stdoutdata:
sys.stdout.write(stdoutdata)
else:
break
print "Return code", popenobj.returncode

(如果你使用read(),它会尝试读取整个"文件"这是没有用的,我们在这里真正可以使用的是读取管道中现在所有的数据)

也可以尝试使用线程来解决这个问题,例如:

import subprocess
import sys
import threading
popenobj = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True)
def stdoutprocess(o):
while True:
stdoutdata = o.stdout.readline()
if stdoutdata:
sys.stdout.write(stdoutdata)
else:
break
t = threading.Thread(target=stdoutprocess, args=(popenobj,))
t.start()
popenobj.wait()
t.join()
print "Return code", popenobj.returncode

现在我们可以通过两个线程来添加stderr。

注意子进程文档不鼓励直接使用这些文件,并建议使用communicate()(主要关注死锁,我认为这不是上面的问题),解决方案有点不可靠,所以看起来真的像子进程模块不太适合作业。(参见:http://www.python.org/dev/peps/pep-3145/),我们需要看看别的东西。

一个更复杂的解决方案是使用Twisted,如下所示:https://twistedmatrix.com/documents/11.1.0/core/howto/process.html

使用Twisted的方法是使用reactor.spawnprocess()创建进程,并提供一个ProcessProtocol,然后异步处理输出。Twisted示例Python代码在这里:https://twistedmatrix.com/documents/11.1.0/core/howto/listings/process/process.py

为什么不直接设置stdoutsys.stdout?如果您也需要输出到日志,那么您可以简单地覆盖f的write方法。

import sys
import subprocess
class SuperFile(open.__class__):
def write(self, data):
sys.stdout.write(data)
super(SuperFile, self).write(data)
f = SuperFile("log.txt","w+")       
process = subprocess.Popen(command, stdout=f, stderr=f)

基于上述所有内容,我建议稍微修改一下版本(python3):

  • while循环调用readline(建议的iter解决方案似乎对我来说永远阻塞- Python 3, Windows 7)
  • 结构化,因此在轮询返回后不需要重复处理读取数据-None
  • stderr管道到标准输出,所以两个输出输出都被读取
  • 增加了获取cmd退出值的代码。代码:

import subprocess
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, universal_newlines=True)
while True:
rd = proc.stdout.readline()
print(rd, end='')  # and whatever you want to do...
if not rd:  # EOF
returncode = proc.poll()
if returncode is not None:
break
time.sleep(0.1)  # cmd closed stdout, but not exited yet
# You may want to check on ReturnCode here

我找到了一个解决一个非常复杂问题的简单方法。

  1. 标准输出和标准错误都需要流式传输。
  2. 它们都需要是非阻塞的:当没有输出时和当输出过多时。
  3. 不想使用线程或多进程,也不愿意使用pexpect。

这个解决方案使用了我在这里找到的要点

import subprocess as sbp
import fcntl
import os
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.readline()
except:
return ""
with sbp.Popen('find / -name fdsfjdlsjf',
shell=True,
universal_newlines=True,
encoding='utf-8',
bufsize=1,
stdout=sbp.PIPE,
stderr=sbp.PIPE) as p:
while True:
out = non_block_read(p.stdout)
err = non_block_read(p.stderr)
if out:
print(out, end='')
if err:
print('E: ' + err, end='')
if p.poll() is not None:
break

看起来行缓冲的输出将适合您,在这种情况下,下面的内容可能适合您。(警告:这是未经测试的。)这只会实时给出子进程的标准输出。如果您想实时地同时拥有标准错误和标准输出,您将不得不对select做一些更复杂的事情。

proc = subprocess.Popen(run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
while proc.poll() is None:
line = proc.stdout.readline()
print line
log_file.write(line + 'n')
# Might still be data on stdout at this point.  Grab any
# remainder.
for line in proc.stdout.read().split('n'):
print line
log_file.write(line + 'n')
# Do whatever you want with proc.stderr here...

我尝试的所有上述解决方案都未能分离stderr和stdout输出(多个管道),或者在操作系统管道缓冲区满时永远阻塞,当您运行的命令输出太快时发生(在python poll()子进程手册上有警告)。我发现唯一可靠的方法是通过select,但这是一个仅限posix的解决方案:

import subprocess
import sys
import os
import select
# returns command exit status, stdout text, stderr text
# rtoutput: show realtime output while running
def run_script(cmd,rtoutput=0):
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
poller = select.poll()
poller.register(p.stdout, select.POLLIN)
poller.register(p.stderr, select.POLLIN)
coutput=''
cerror=''
fdhup={}
fdhup[p.stdout.fileno()]=0
fdhup[p.stderr.fileno()]=0
while sum(fdhup.values()) < len(fdhup):
try:
r = poller.poll(1)
except select.error, err:
if err.args[0] != EINTR:
raise
r=[]
for fd, flags in r:
if flags & (select.POLLIN | select.POLLPRI):
c = os.read(fd, 1024)
if rtoutput:
sys.stdout.write(c)
sys.stdout.flush()
if fd == p.stderr.fileno():
cerror+=c
else:
coutput+=c
else:
fdhup[fd]=1
return p.poll(), coutput.strip(), cerror.strip()

所有的python解决方案都不适合我。事实证明,proc.stdout.read()或类似的可能永远阻塞。

因此,我这样使用tee:
subprocess.run('./my_long_running_binary 2>&1 | tee -a my_log_file.txt && exit ${PIPESTATUS}', shell=True, check=True, executable='/bin/bash')

如果您已经在使用shell=True,则此解决方案非常方便。

${PIPESTATUS}捕获整个命令链的成功状态(仅在Bash中可用)。如果我省略了&& exit ${PIPESTATUS},那么这将总是返回零,因为tee永远不会失败。

unbuffer可能需要立即将每行打印到终端,而不是等待太长时间直到"管道缓冲区"被填满。但是,unbuffer会接收assert的退出状态(SIG Abort)…

2>&1也记录sterror到文件。

我认为subprocess.communicate方法有点误导:它实际上填充了您在subprocess.Popen中指定的stdoutstderr

然而,从subprocess.PIPE读取你可以提供给subprocess.Popenstdoutstderr参数最终会填满OS管道缓冲区并死锁你的应用程序(特别是如果你有多个进程/线程必须使用subprocess)。

我建议的解决方案是提供stdoutstderr文件-并读取文件的内容,而不是从死锁PIPE读取。这些文件可以是tempfile.NamedTemporaryFile(),当它们被subprocess.communicate写入时,也可以被读取。

下面是一个示例用法:

try:
with ProcessRunner(
("python", "task.py"), env=os.environ.copy(), seconds_to_wait=0.01
) as process_runner:
for out in process_runner:
print(out)
except ProcessError as e:
print(e.error_message)
raise

这是可以使用的源代码并尽可能多地提供注释来解释它的作用:

如果你正在使用python 2,请确保首先从pypi安装最新版本的subprocess32包。

import os
import sys
import threading
import time
import tempfile
import logging
if os.name == 'posix' and sys.version_info[0] < 3:
# Support python 2
import subprocess32 as subprocess
else:
# Get latest and greatest from python 3
import subprocess
logger = logging.getLogger(__name__)

class ProcessError(Exception):
"""Base exception for errors related to running the process"""

class ProcessTimeout(ProcessError):
"""Error that will be raised when the process execution will exceed a timeout"""

class ProcessRunner(object):
def __init__(self, args, env=None, timeout=None, bufsize=-1, seconds_to_wait=0.25, **kwargs):
"""
Constructor facade to subprocess.Popen that receives parameters which are more specifically required for the
Process Runner. This is a class that should be used as a context manager - and that provides an iterator
for reading captured output from subprocess.communicate in near realtime.
Example usage:

try:
with ProcessRunner(('python', task_file_path), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
for out in process_runner:
print(out)
except ProcessError as e:
print(e.error_message)
raise
:param args: same as subprocess.Popen
:param env: same as subprocess.Popen
:param timeout: same as subprocess.communicate
:param bufsize: same as subprocess.Popen
:param seconds_to_wait: time to wait between each readline from the temporary file
:param kwargs: same as subprocess.Popen
"""
self._seconds_to_wait = seconds_to_wait
self._process_has_timed_out = False
self._timeout = timeout
self._process_done = False
self._std_file_handle = tempfile.NamedTemporaryFile()
self._process = subprocess.Popen(args, env=env, bufsize=bufsize,
stdout=self._std_file_handle, stderr=self._std_file_handle, **kwargs)
self._thread = threading.Thread(target=self._run_process)
self._thread.daemon = True
def __enter__(self):
self._thread.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._thread.join()
self._std_file_handle.close()
def __iter__(self):
# read all output from stdout file that subprocess.communicate fills
with open(self._std_file_handle.name, 'r') as stdout:
# while process is alive, keep reading data
while not self._process_done:
out = stdout.readline()
out_without_trailing_whitespaces = out.rstrip()
if out_without_trailing_whitespaces:
# yield stdout data without trailing n
yield out_without_trailing_whitespaces
else:
# if there is nothing to read, then please wait a tiny little bit
time.sleep(self._seconds_to_wait)
# this is a hack: terraform seems to write to buffer after process has finished
out = stdout.read()
if out:
yield out
if self._process_has_timed_out:
raise ProcessTimeout('Process has timed out')
if self._process.returncode != 0:
raise ProcessError('Process has failed')
def _run_process(self):
try:
# Start gathering information (stdout and stderr) from the opened process
self._process.communicate(timeout=self._timeout)
# Graceful termination of the opened process
self._process.terminate()
except subprocess.TimeoutExpired:
self._process_has_timed_out = True
# Force termination of the opened process
self._process.kill()
self._process_done = True
@property
def return_code(self):
return self._process.returncode

这是我在我的一个项目中使用的一个类。它将子流程的输出重定向到日志。起初,我试图简单地覆盖write-method,但这不起作用,因为子进程永远不会调用它(重定向发生在文件描述符级别)。所以我使用我自己的管道,类似于在子进程模块中完成的方式。这样做的优点是将所有日志/打印逻辑封装在适配器中,并且您可以简单地将日志记录器的实例传递给Popen:subprocess.Popen("/path/to/binary", stderr = LogAdapter("foo"))

class LogAdapter(threading.Thread):
def __init__(self, logname, level = logging.INFO):
super().__init__()
self.log = logging.getLogger(logname)
self.readpipe, self.writepipe = os.pipe()
logFunctions = {
logging.DEBUG: self.log.debug,
logging.INFO: self.log.info,
logging.WARN: self.log.warn,
logging.ERROR: self.log.warn,
}
try:
self.logFunction = logFunctions[level]
except KeyError:
self.logFunction = self.log.info
def fileno(self):
#when fileno is called this indicates the subprocess is about to fork => start thread
self.start()
return self.writepipe
def finished(self):
"""If the write-filedescriptor is not closed this thread will
prevent the whole program from exiting. You can use this method
to clean up after the subprocess has terminated."""
os.close(self.writepipe)
def run(self):
inputFile = os.fdopen(self.readpipe)
while True:
line = inputFile.readline()
if len(line) == 0:
#no new data was added
break
self.logFunction(line.strip())

如果您不需要日志记录,但只是想使用print(),您显然可以删除大部分代码并使类更短。您还可以通过__enter____exit__方法展开它,并在__exit__中调用finished,以便您可以轻松地将其用作上下文。

import os
def execute(cmd, callback):
for line in iter(os.popen(cmd).readline, ''): 
callback(line[:-1])
execute('ls -a', print)

有同样的问题,并制定了一个简单而干净的解决方案,使用process.sdtout.read1()完美地满足我在python3中的需求。

下面是使用ping命令(需要internet连接)的演示:

from subprocess import Popen, PIPE
cmd = "ping 8.8.8.8"
proc = Popen([cmd], shell=True, stdout=PIPE)
while True:
print(proc.stdout.read1())

当ping命令实时报告其数据时,每隔一秒左右就会在python控制台中打印一个新行。

在我看来"子流程命令的实时输出";意味着stdout和stderr都应该是活的。并且stdin也应该被传递给子进程。

下面的片段在stdout和stderr上产生实时输出,并在输出结果中作为字节捕获它们。

技巧在于正确使用select和poll。

在Python 3.9上运行良好。


if self.log == 1:
print(f"** cmnd= {fullCmndStr}")
self.outcome.stdcmnd = fullCmndStr
try:
process = subprocess.Popen(
fullCmndStr,
shell=True,
encoding='utf8',
executable="/bin/bash",
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except OSError:
self.outcome.error = OSError
else:
process.stdin.write(stdin)
process.stdin.close() # type: ignore
stdoutStrFile = io.StringIO("")
stderrStrFile = io.StringIO("")
pollStdout = select.poll()
pollStderr = select.poll()
pollStdout.register(process.stdout, select.POLLIN)
pollStderr.register(process.stderr, select.POLLIN)
stdoutEOF = False
stderrEOF = False
while True:
stdoutActivity = pollStdout.poll(0)
if stdoutActivity:
c= process.stdout.read(1)
if c:
stdoutStrFile.write(c)
if self.log == 1:
sys.stdout.write(c)
else:
stdoutEOF = True
stderrActivity = pollStderr.poll(0)
if stderrActivity:
c= process.stderr.read(1)
if c:
stderrStrFile.write(c)
if self.log == 1:
sys.stderr.write(c)
else:
stderrEOF = True
if stdoutEOF and stderrEOF:
break
if self.log == 1:
print(f"** cmnd={fullCmndStr}")
process.wait() # type: ignore
self.outcome.stdout = stdoutStrFile.getvalue()
self.outcome.stderr = stderrStrFile.getvalue()
self.outcome.error = process.returncode # type: ignore

我发现如何在Python中以流方式读取子进程的输出(同时在变量中捕获它)(用于多个输出流,即stdoutstderr)的唯一方法是通过传递子进程一个命名的临时文件来写入,然后在单独的读取句柄中打开相同的临时文件。

注意:这适用于Python 3

stdout_write = tempfile.NamedTemporaryFile()
stdout_read = io.open(stdout_write.name, "r")
stderr_write = tempfile.NamedTemporaryFile()
stderr_read = io.open(stderr_write.name, "r")
stdout_captured = ""
stderr_captured = ""
proc = subprocess.Popen(["command"], stdout=stdout_write, stderr=stderr_write)
while True:
proc_done: bool = cli_process.poll() is not None
while True:
content = stdout_read.read(1024)
sys.stdout.write(content)
stdout_captured += content
if len(content) < 1024:
break
while True:
content = stderr_read.read(1024)
sys.stderr.write(content)
stdout_captured += content
if len(content) < 1024:
break
if proc_done:
break
time.sleep(0.1)
stdout_write.close()
stdout_read.close()
stderr_write.close()
stderr_read.close()

但是,如果您不需要捕获输出,那么您可以简单地将sys.stdoutsys.stderr流从Python脚本传递到被调用的子进程,正如xaav在他的回答中建议的那样:

subprocess.Popen(["command"], stdout=sys.stdout, stderr=sys.stderr)

这是一个旧的帖子,但是在Python 3——在Python 3.11中测试——下面的代码对我来说工作,直播或"实时"流;使用subprocess模块输出:

import sys
from os import fdopen
from subprocess import Popen, PIPE, STDOUT

with Popen(command,
shell=True,
stdout=PIPE,
stderr=STDOUT) as sp:
with fdopen(sys.stdout.fileno(), 'wb', closefd=False) as stdout:
for line in sp.stdout:
stdout.write(line)
stdout.flush()

便利功能由于习惯用法,我通常创建一个方便的函数run来在终端中链接命令列表并实时流式输出。

请注意,我在这里使用&&作为分隔符,但您可以轻松地使用另一个分隔符,例如;,如果您不想在错误早期失败,甚至可以使用&

import sys
from os import fdopen
from subprocess import Popen, PIPE, STDOUT
def run(cmds, join='&&'):
with Popen(join.join(cmds),
shell=True,
stdout=PIPE,
stderr=STDOUT) as sp:
with fdopen(sys.stdout.fileno(), 'wb', closefd=False) as stdout:
for line in sp.stdout:
stdout.write(line)
stdout.flush()

用法如下:

commands = [
'echo hello',
'sleep 3',
'echo world',
'sleep 2',
'echo !',
]
run(commands)

处理命令的实时输出流可以通过在subprocess.Popen运行时迭代stdout来实现。

这个实现:

  • 使用with语句关闭标准文件描述符,等待
  • 将关键字参数传播到子进程构造函数
  • 默认为text=True自动解码字节串为字符串
  • 如果check=Truesubprocess.run失败,引发CalledProcessError
  • 在成功时返回CompletedProcess,就像subprocess.run一样
  • 使用两个线程并发地处理标准输出和标准错误(对于没有线程将标准输出重定向到标准错误的版本,请参阅我的简化答案)
import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen

def stream_command(
args,
*,
stdout_handler=logging.info,
stderr_handler=logging.error,
check=True,
text=True,
stdout=PIPE,
stderr=PIPE,
**kwargs,
):
"""Mimic subprocess.run, while processing the command output in real time."""
with Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process:
with ThreadPoolExecutor(2) as pool:  # two threads to handle the streams
exhaust = partial(pool.submit, partial(deque, maxlen=0))
exhaust(stdout_handler(line[:-1]) for line in process.stdout)
exhaust(stderr_handler(line[:-1]) for line in process.stderr)
retcode = process.poll()
if check and retcode:
raise CalledProcessError(retcode, process.args)
return CompletedProcess(process.args, retcode)

记录文件就像设置logging:

一样简单
logging.basicConfig(
level=logging.INFO,
filename="./capture.log",
filemode="w",
encoding="utf-8",
)
logging.info("test from python")
stream_command(["echo", "test from subprocess"])

与结果文件:

$ cat ./capture.log
INFO:root:test from python
INFO:root:test from subprocess

行为可以调整为偏好(print代替logging.info,或两者兼而有之,等等):

stream_command(["echo", "test"])
# INFO:root:test
stream_command("cat ./nonexist", shell=True, check=False)
# ERROR:root:cat: ./nonexist: No such file or directory
stream_command(["echo", "test"], stdout_handler=print)
# test
stdout_lines = []
def handler(line):
print(line)
logging.info(line)
stdout_lines.append(line)
stream_command(["echo", "test"], stdout_handler=handler)
# test
# INFO:root:test
print(stdout_lines)
# ['test']

最新更新