我想要实现的目标
- 我想在Python的外部程序上逐行流式传输类似生成器的对象中的元素
- 崩溃了,我想要像
Generator -> Popen(...) -> Generator
这样的东西,而不需要在内存中保存太多数据
这里有一个简单的例子,展示了我想要实现的目标:
from io import StringIO
from subprocess import Popen, PIPE
import time
proc_input = StringIO("aanbbnccndd")
proc = Popen(["cat"], stdin=PIPE, stdout=PIPE)
for line in proc_input:
proc.stdin.write(line.encode())
yield proc.stdout.readline()
time.sleep(1)
问题:proc.stdout.readline()
只是阻塞,没有显示任何内容。
我已经学到了:
- 如果输入来自类似文件的对象(即实现了
fileno()
的对象(,我可以直接将其传递给stdin,避免写入PIPE。但要做到这一点,我需要首先将生成器流式传输到一个文件中,我喜欢避免这样做,因为这似乎是一个不必要的迂回。例如,以下工作
import tempfile
from subprocess import Popen, PIPE
tp = tempfile.TemporaryFile()
tp.write("aanbbnccndd".encode())
tp.seek(0)
proc = Popen(["cat"], stdin=tp, stdout=PIPE)
for line in proc.stdout:
print(line)
- 如果我坚持写PIPE对象,我可以通过关闭输入流然后从输出流中读取来解决问题。但在此期间,我不知道这些数据在哪里。因为我的生成器产生GB的数据,我不想遇到内存错误
proc_input = StringIO("aanbbnccndd")
proc = Popen(["cat"], stdin=PIPE, stdout=PIPE)
for line in proc_input:
proc.stdin.write(line.encode())
proc.stdin.close()
for line in proc.stdout:
print(line)
我也尝试过:
- 我反复讨论了buffersize参数
Popen(..., bufsize=)
,但它似乎没有任何效果 - 我尝试将输入数据写入
io.BufferedWriter
,希望Popen能够将其作为stdin的输入进行消化。同样没有成功
其他信息:我使用的是Linux。
备注
有人建议将输入生成器分解为块。这可以通过实现
def PopenStreaming(process, popen_kwargs, nlines, input):
while input:
proc = Popen(process, stdin=PIPE, stdout=PIPE, **popen_kwargs)
for n, row in enumerate(input):
proc.stdin.write(row)
if n == nlines:
proc.stdin.close()
break
for row in proc.stdout:
yield row
我不确定是否总是可以做你想做的事情https://docs.python.org/3/library/subprocess.html比如
警告:请使用
communicate()
而不是.stdin.write
、.stdout.read
或.stderr.read
,以避免由于任何其他操作系统管道缓冲区填满并阻塞子进程而导致死锁。
所以应该使用communicate
,但这意味着等待进程终止:
Popen.communicate(input=None, timeout=None)
与进程交互:将数据发送到stdin。从stdout和stderr读取数据,直到到达文件末尾。等待进程终止。
这意味着您只能使用communicate
一次,这不是您想要的。
然而,我认为使用行缓冲文本模式应该是安全的,以避免死锁:
from subprocess import Popen, PIPE
kwargs = {
"stdin": PIPE,
"stdout": PIPE,
"universal_newlines": True, # text mode
"bufsize": 1, # line buffered
}
with Popen(["cat"], **kwargs) as process:
for data in ["An", "Bn", "Cn"]:
process.stdin.write(data)
print("data sent:", data)
output = process.stdout.readline()
print("output received:", output)
如果这在你的情况下不适用,也许你可以把你的电话分成多个较小的电话?使用check_output
及其input
关键字参数也可以简化代码:
from subprocess import check_output
output = check_output(["cat"], input=b"somethingn")
print(output)