使用子进程启动 hadoop 作业,但无法从标准输出获取日志



为了简化我的问题,这里有一个python脚本:

from subprocess import Popen, PIPE
proc = Popen(['./mr-task.sh'], shell=True, stdout=PIPE, stderr=PIPE)
while True:
    out = proc.stdout.readline()
    print(out)

这是mr-task.sh,它启动了一个mapreduce作业:

hadoop jar xxx.jar some-conf-we-don't-need-to-care

当我运行./mr-task时,我可以看到屏幕上打印的日志,如下所示:

14/12/25 14:56:44 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/12/25 14:56:44 INFO snappy.LoadSnappy: Snappy native library loaded
14/12/25 14:57:01 INFO mapred.JobClient: Running job: job_201411181108_16380
14/12/25 14:57:02 INFO mapred.JobClient:  map 0% reduce 0%
14/12/25 14:57:28 INFO mapred.JobClient:  map 100% reduce 0%

但是我无法让这些输出运行 python 脚本。我尝试删除shell=True或获取标准,但仍然一无所获。

有谁知道为什么会这样?

您可以将 stderr 重定向到 stdout:

from subprocess import Popen, PIPE, STDOUT
proc = Popen(['./mr-task.sh'], stdout=PIPE, stderr=STDOUT, bufsize=1)
for line in iter(proc.stdout.readline, b''):
    print line,
proc.stdout.close()
proc.wait()

请参阅 Python:从 subprocess.communication() 读取流输入。


在我的真实程序中,我将 stderr 重定向到 stdout 并从 stdout 读取,所以不需要 bufsize,是吗?

将 stderr 重定向到 stdout 和 bufsize 是不相关的。更改bufsize可能会影响时间性能(默认的bufsize=0,即在Python 2上未缓冲)。未缓冲的 I/O 可能会慢 10..100 倍。像往常一样,如果时间性能很重要,您应该测量时间性能。

在子进程终止后调用 Popen.wait/communication 只是为了清除僵尸进程,这两种方法在这种情况下没有区别,对吗?

不同之处在于,proc.communicate()在收获子进程之前关闭管道。它释放文件描述符(有限资源)以供程序中的其他文件使用。

关于缓冲区,

如果输出填充缓冲区 maxsize,子进程会挂起吗?这是否意味着如果我使用默认的 bufsize=0 设置,我需要尽快从 stdout 读取,以便子进程不会阻塞?

不。它是一个不同的缓冲区。 bufsize 控制调用方法时填充/清空.readline()父级内部的缓冲区。无论bufsize是什么,都不会有僵局。

无论子代码可能产生多少输出,代码(如上所述)都不会死锁。

@falsetru答案中的代码可能会死锁,因为它创建了两个管道(stdout=PIPE, stderr=PIPE),但它只从一个管道读取(proc.stderr)。

子进程和父进程之间有几个缓冲区,例如,C stdio 的 stdout 缓冲区(子进程内的 libc 缓冲区,无法从父进程访问),子进程的 stdout OS 管道缓冲区(在内核内部,父进程可以从这里读取数据)。这些缓冲区是固定的,如果将更多数据放入其中,它们将不会增长。如果 stdio 的缓冲区溢出(例如,在printf()调用期间),则数据将向下推送到子级的 stdout OS 管道缓冲区中。如果没有人从管道读取,则此操作系统管道缓冲区将填满,子块(例如,在系统调用write())尝试刷新数据。

具体来说,我假设了C stdio的基于程序和POSIXy OS。

发生死锁是因为父级尝试从空的 stderr 管道中读取,因为子级正忙于尝试刷新其 stdout。因此,这两个进程都挂起了。

一种可能的 reaosn 是输出被打印到标准错误而不是标准输出。

尝试将stdout替换为stderr

from subprocess import Popen, PIPE
proc = Popen(['./mr-task.sh'], stdout=PIPE, stderr=PIPE)
while True:
    out = proc.stderr.readline()  # <----
    if not out:
        break
    print(out)

相关内容

  • 没有找到相关文章

最新更新