为了简化我的问题,这里有一个python脚本:
from subprocess import Popen, PIPE
proc = Popen(['./mr-task.sh'], shell=True, stdout=PIPE, stderr=PIPE)
while True:
out = proc.stdout.readline()
print(out)
这是mr-task.sh
,它启动了一个mapreduce作业:
hadoop jar xxx.jar some-conf-we-don't-need-to-care
当我运行./mr-task
时,我可以看到屏幕上打印的日志,如下所示:
14/12/25 14:56:44 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/12/25 14:56:44 INFO snappy.LoadSnappy: Snappy native library loaded
14/12/25 14:57:01 INFO mapred.JobClient: Running job: job_201411181108_16380
14/12/25 14:57:02 INFO mapred.JobClient: map 0% reduce 0%
14/12/25 14:57:28 INFO mapred.JobClient: map 100% reduce 0%
但是我无法让这些输出运行 python 脚本。我尝试删除shell=True
或获取标准,但仍然一无所获。
有谁知道为什么会这样?
您可以将 stderr 重定向到 stdout:
from subprocess import Popen, PIPE, STDOUT
proc = Popen(['./mr-task.sh'], stdout=PIPE, stderr=STDOUT, bufsize=1)
for line in iter(proc.stdout.readline, b''):
print line,
proc.stdout.close()
proc.wait()
请参阅 Python:从 subprocess.communication() 读取流输入。
在我的真实程序中,我将 stderr 重定向到 stdout 并从 stdout 读取,所以不需要 bufsize,是吗?
将 stderr 重定向到 stdout 和 bufsize
是不相关的。更改bufsize
可能会影响时间性能(默认的bufsize=0,即在Python 2上未缓冲)。未缓冲的 I/O 可能会慢 10..100 倍。像往常一样,如果时间性能很重要,您应该测量时间性能。
在子进程终止后调用 Popen.wait/communication 只是为了清除僵尸进程,这两种方法在这种情况下没有区别,对吗?
不同之处在于,proc.communicate()
在收获子进程之前关闭管道。它释放文件描述符(有限资源)以供程序中的其他文件使用。
关于缓冲区,如果输出填充缓冲区 maxsize,子进程会挂起吗?这是否意味着如果我使用默认的 bufsize=0 设置,我需要尽快从 stdout 读取,以便子进程不会阻塞?
不。它是一个不同的缓冲区。 bufsize
控制调用方法时填充/清空.readline()
父级内部的缓冲区。无论bufsize
是什么,都不会有僵局。
无论子代码可能产生多少输出,代码(如上所述)都不会死锁。
@falsetru答案中的代码可能会死锁,因为它创建了两个管道(stdout=PIPE, stderr=PIPE
),但它只从一个管道读取(proc.stderr
)。
子进程和父进程之间有几个缓冲区,例如,C stdio 的 stdout 缓冲区(子进程内的 libc 缓冲区,无法从父进程访问),子进程的 stdout OS 管道缓冲区(在内核内部,父进程可以从这里读取数据)。这些缓冲区是固定的,如果将更多数据放入其中,它们将不会增长。如果 stdio 的缓冲区溢出(例如,在printf()
调用期间),则数据将向下推送到子级的 stdout OS 管道缓冲区中。如果没有人从管道读取,则此操作系统管道缓冲区将填满,子块(例如,在系统调用write()
)尝试刷新数据。
具体来说,我假设了C stdio的基于程序和POSIXy OS。
发生死锁是因为父级尝试从空的 stderr 管道中读取,因为子级正忙于尝试刷新其 stdout。因此,这两个进程都挂起了。
一种可能的 reaosn 是输出被打印到标准错误而不是标准输出。
尝试将stdout
替换为stderr
:
from subprocess import Popen, PIPE
proc = Popen(['./mr-task.sh'], stdout=PIPE, stderr=PIPE)
while True:
out = proc.stderr.readline() # <----
if not out:
break
print(out)