通过外部命令通过 Python 子进程的 Popen 流式传输内存中的数据



我想要实现的目标

  • 我想在Python的外部程序上逐行流式传输类似生成器的对象中的元素
  • 崩溃了,我想要像Generator -> Popen(...) -> Generator这样的东西,而不需要在内存中保存太多数据

这里有一个简单的例子,展示了我想要实现的目标:


    from io import StringIO
    from subprocess import Popen, PIPE
    import time
    proc_input = StringIO("aanbbnccndd")
    proc = Popen(["cat"], stdin=PIPE, stdout=PIPE)
    for line in  proc_input:
        proc.stdin.write(line.encode())
        yield proc.stdout.readline()
        time.sleep(1)

问题proc.stdout.readline()只是阻塞,没有显示任何内容。

我已经学到了

  • 如果输入来自类似文件的对象(即实现了fileno()的对象(,我可以直接将其传递给stdin,避免写入PIPE。但要做到这一点,我需要首先将生成器流式传输到一个文件中,我喜欢避免这样做,因为这似乎是一个不必要的迂回。例如,以下工作

    import tempfile
    from subprocess import Popen, PIPE
    tp = tempfile.TemporaryFile()
    tp.write("aanbbnccndd".encode())
    tp.seek(0)
    proc = Popen(["cat"], stdin=tp, stdout=PIPE)
    for line in proc.stdout:
        print(line)
  • 如果我坚持写PIPE对象,我可以通过关闭输入流然后从输出流中读取来解决问题。但在此期间,我不知道这些数据在哪里。因为我的生成器产生GB的数据,我不想遇到内存错误

    proc_input = StringIO("aanbbnccndd")
    proc = Popen(["cat"], stdin=PIPE, stdout=PIPE)
    for line in  proc_input:
        proc.stdin.write(line.encode())
    proc.stdin.close()
    for line in proc.stdout:
            print(line)

我也尝试过

  • 我反复讨论了buffersize参数Popen(..., bufsize=),但它似乎没有任何效果
  • 我尝试将输入数据写入io.BufferedWriter,希望Popen能够将其作为stdin的输入进行消化。同样没有成功

其他信息:我使用的是Linux。

备注

有人建议将输入生成器分解为块。这可以通过实现

   def PopenStreaming(process, popen_kwargs, nlines, input):
        while input:
            proc = Popen(process, stdin=PIPE, stdout=PIPE, **popen_kwargs)
            for n, row in enumerate(input):
                proc.stdin.write(row)
                if n == nlines:
                    proc.stdin.close()
                    break
            for row in proc.stdout:
                yield row

我不确定是否总是可以做你想做的事情https://docs.python.org/3/library/subprocess.html比如

警告:请使用communicate()而不是.stdin.write.stdout.read.stderr.read,以避免由于任何其他操作系统管道缓冲区填满并阻塞子进程而导致死锁。

所以应该使用communicate,但这意味着等待进程终止:

Popen.communicate(input=None, timeout=None)与进程交互:将数据发送到stdin。从stdout和stderr读取数据,直到到达文件末尾。等待进程终止。

这意味着您只能使用communicate一次,这不是您想要的。

然而,我认为使用行缓冲文本模式应该是安全的,以避免死锁:

from subprocess import Popen, PIPE
kwargs = {
    "stdin": PIPE,
    "stdout": PIPE,
    "universal_newlines": True,  # text mode
    "bufsize": 1,  # line buffered
}
with Popen(["cat"], **kwargs) as process:
    for data in ["An", "Bn", "Cn"]:
        process.stdin.write(data)
        print("data sent:", data)
        output = process.stdout.readline()
        print("output received:", output)

如果这在你的情况下不适用,也许你可以把你的电话分成多个较小的电话?使用check_output及其input关键字参数也可以简化代码:

from subprocess import check_output
output = check_output(["cat"], input=b"somethingn")
print(output)

最新更新