Python:从多个子进程异步打印标准输出



我正在测试一种在Python 2.7中从几个子进程中打印stdout的方法。我所设置的是一个主进程,它目前生成三个子进程并输出它们的输出。每个子进程都是一个for循环,它会随机休眠一段时间,当它醒来时,会说"Slept for X seconds"。

我看到的问题是打印出来似乎是同步的。假设子进程A休眠1秒,子进程B休眠3秒,子进程C休眠10秒。当主进程试图查看子进程C是否有东西时,主进程会停止整整10秒,即使其他两个进程可能已经休眠并打印了一些东西。这是为了模拟一个子进程在比其他两个子进程更长的时间内确实没有输出。

我需要一个能在Windows上工作的解决方案。

我的代码如下:

main_process.py

import sys
import subprocess
logfile = open('logfile.txt', 'w')
processes = [
            subprocess.Popen('python subproc_1.py', stdout=subprocess.PIPE, bufsize=1), 
            subprocess.Popen('python subproc_2.py', stdout=subprocess.PIPE, bufsize=1), 
            subprocess.Popen('python subproc_3.py', stdout=subprocess.PIPE, bufsize=1), 
        ]

while True:
    line = processes[0].stdout.readline() 
    if line != '':
        sys.stdout.write(line)
        logfile.write(line)
    line = processes[1].stdout.readline()
    if line != '':
        sys.stdout.write(line)
        logfile.write(line)
    line = processes[2].stdout.readline()
    if line != '':
        sys.stdout.write(line)
        logfile.write(line)
    #If everyone is dead, break
    if processes[0].poll() is not None and 
       processes[1].poll() is not None and 
       processes[2].poll() is not None:
        break
processes[0].wait()
processes[1].wait()
print 'Done'

subproc_1.py/subproc_2.py subproc_3.py

import time, sys, random
sleep_time = random.random() * 3
for x in range(0, 20):
    print "[PROC1] Slept for {0} seconds".format(sleep_time)
    sys.stdout.flush()
    time.sleep(sleep_time)
    sleep_time = random.random() * 3 #this is different for each subprocess.

更新:解决方案

根据下面的答案和这个问题,这应该是可行的。

import sys
import subprocess
from threading import Thread
try:
    from Queue import Queue, Empty
except ImportError:
    from queue import Queue, Empty # for Python 3.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()
if __name__ == '__main__':
    logfile = open('logfile.txt', 'w')
    processes = [
                subprocess.Popen('python subproc_1.py', stdout=subprocess.PIPE, bufsize=1), 
                subprocess.Popen('python subproc_2.py', stdout=subprocess.PIPE, bufsize=1), 
                subprocess.Popen('python subproc_3.py', stdout=subprocess.PIPE, bufsize=1), 
            ]
    q = Queue()
    threads = []
    for p in processes:
        threads.append(Thread(target=enqueue_output, args=(p.stdout, q)))
    for t in threads:
        t.daemon = True
        t.start()
    while True:
        try:
            line = q.get_nowait()
        except Empty:
            pass
        else:
            sys.stdout.write(line)
            logfile.write(line)
            logfile.flush()
        #break when all processes are done.
        if all(p.poll() is not None for p in processes):
            break
    print 'All processes done'

我不确定在while循环结束时是否需要任何清理代码。如果有人对此有意见,请补充。

和每个子程序脚本看起来类似于这个(我编辑为了做一个更好的例子):

import datetime, time, sys, random
for x in range(0, 20):
    sleep_time = random.random() * 3
    time.sleep(sleep_time)
    timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%H%M%S.%f')
    print "[{0}][PROC1] Slept for {1} seconds".format(timestamp, sleep_time)
    sys.stdout.flush()
print "[{0}][PROC1] Done".format(timestamp)
sys.stdout.flush()

您的问题来自readline()是一个阻塞函数;如果您在文件对象上调用它,并且没有一行等待读取,则该调用将不会返回,直到有一行输出。因此,您现在所拥有的将按照的顺序从子进程1、2和3 中重复读取,并在每次读取时暂停,直到输出就绪。

(编辑: OP澄清说他们是在Windows上,这使得下面的不适用。)

如果您想从准备好的输出流中读取数据,您需要使用select模块以非阻塞的方式检查流的状态,然后只尝试读取那些准备好的数据。select提供了多种方法来实现这一点,但为了示例起见,我们将使用select.select()。在启动子进程之后,您将得到如下内容:

streams = [p.stdout for p in processes]
def output(s):
    for f in [sys.stdout, logfile]:
        f.write(s)
        f.flush()
while True:
    rstreams, _, _ = select.select(streams, [], [])
    for stream in rstreams:
        line = stream.readline()
        output(line)
    if all(p.poll() is not None for p in processes):
        break
for stream in streams:
    output(stream.read())

当使用三个文件对象列表(或文件描述符)调用select()时,它所做的是返回其参数的三个子集,这些子集是准备好读取、准备好写入或有错误条件的流。因此,在循环的每次迭代中,我们检查哪些输出流准备好读取,并迭代这些输出流。然后我们重复。(注意,在这里对输出进行行缓冲是很重要的;上面的代码假设,如果一个流已经准备好可以读取,那么至少有一整行可以读取。如果你指定了不同的缓冲,上面可以阻塞。)

原始代码的另一个问题:当您在poll()报告所有子进程已经退出后退出循环时,您可能没有读取它们的所有输出。因此,您需要对流进行最后一次扫描,以读取任何剩余的输出。

注意:我给出的示例代码并没有尝试以准确的顺序捕获子流程的输出(这是不可能做到完美的,但可以比上面的代码更接近)。它还缺乏其他改进(例如,在主循环中,它将继续选择每个子进程的标准输出,即使在一些子进程已经终止之后,这是无害的,但效率低下)。这只是为了说明非阻塞IO的基本技术。

最新更新