Python 子过程:给出 stdin,读取 stdout,然后给出更多的 stdin



我正在使用一个名为Chimera的科学软件。对于这个问题下游的一些代码,它要求我使用 Python 2.7。

我想调用一个进程,给该进程一些输入,读取其输出,基于该进程为其提供更多输入,等等。

我使用Popen打开进程,process.stdin.write传递标准输入,但随后我在进程仍在运行时尝试获取输出时陷入困境。process.communicate()停止这个过程,process.stdout.readline()似乎让我陷入无限循环。


下面是我想做的一个简化示例:

假设我有一个名为exampleInput.sh的 bash 脚本。

#!/bin/bash
# exampleInput.sh
# Read a number from the input
read -p 'Enter a number: ' num
# Multiply the number by 5
ans1=$( expr $num * 5 )
# Give the user the multiplied number
echo $ans1
# Ask the user whether they want to keep going
read -p 'Based on the previous output, would you like to continue? ' doContinue
if [ $doContinue == "yes" ]
then
echo "Okay, moving on..."
# [...] more code here [...]
else
exit 0
fi

通过命令行与之交互,我将运行脚本,键入"5",然后,如果它返回"25",我将键入"yes",如果没有,我将键入"no"。

我想运行一个 python 脚本,在其中传递exampleInput.sh"5",如果它给我"25",那么我传递"是">

到目前为止,这是我所能得到的尽可能接近的:

#!/home/user/miniconda3/bin/python2
# talk_with_example_input.py
import subprocess
process = subprocess.Popen(["./exampleInput.sh"], 
stdin = subprocess.PIPE,
stdout = subprocess.PIPE)
process.stdin.write("5")
answer = process.communicate()[0]
if answer == "25":
process.stdin.write("yes")
## I'd like to print the STDOUT here, but the process is already terminated

但这当然失败了,因为在"process.communication()"之后,我的进程不再运行了。


(以防万一/仅供参考):实际问题

嵌合体通常是一个基于GUI的应用程序,用于检查蛋白质结构。如果您运行chimera --nogui,它将打开一个提示并接受输入。

在运行下一个命令之前,我经常需要知道嵌合体输出的内容。例如,我经常尝试生成蛋白质表面,如果Chimera不能生成表面,它就不会破裂 - 它只是通过STDOUT说出来。因此,在我的python脚本中,当我循环分析许多蛋白质时,我需要检查STDOUT以了解是否继续分析该蛋白质。

在其他用例中,我将通过Chimera运行大量命令来首先清理蛋白质,然后我需要运行许多单独的命令来获取不同的数据片段,并使用该数据决定是否运行其他命令。我可以获取数据,关闭子进程,然后运行另一个进程,但这需要每次重新运行所有这些清理命令。

无论如何,这些是我希望能够将STDIN推送到子流程,读取STDOUT并且仍然能够推送更多STDIN的一些真实原因。

谢谢你的时间!

您不需要在示例中使用process.communicate

使用process.stdin.writeprocess.stdout.read轻松读写。还要确保发送换行符,否则read不会返回。当您从 stdin(标准)读取时,您还必须处理来自echo的换行符。

注意process.stdout.read将阻止直到EOF

# talk_with_example_input.py
import subprocess
process = subprocess.Popen(["./exampleInput.sh"], 
stdin = subprocess.PIPE,
stdout = subprocess.PIPE)
process.stdin.write("5n")
stdout = process.stdout.readline()
print(stdout)
if stdout == "25n":
process.stdin.write("yesn")
print(process.stdout.readline())
$ python2 test.py
25
Okay, moving on...


更新

以这种方式与程序通信时,您必须特别注意应用程序实际编写的内容。最好是在十六进制编辑器中分析输出:

$ chimera --nogui 2>&1 | hexdump -C

请注意,readline[1]只读取下一个换行符 (n)。在您的情况下,您必须至少调用readline四次才能获得第一个输出块。

如果只想读取所有内容,直到子进程停止打印,则必须逐字节读取并实现超时。可悲的是,readreadline都没有提供这样的超时机制。这可能是因为底层read系统调用[2](Linux)也没有提供。

在 Linux 上,我们可以使用轮询/选择编写单线程read_with_timeout()。有关示例,请参阅[3]。

from select import epoll, EPOLLIN
def read_with_timeout(fd, timeout__s):
"""Reads from fd until there is no new data for at least timeout__s seconds.
This only works on linux > 2.5.44.
"""
buf = []
e = epoll()
e.register(fd, EPOLLIN)
while True:
ret = e.poll(timeout__s)
if not ret or ret[0][1] is not EPOLLIN:
break
buf.append(
fd.read(1)
)
return ''.join(buf)

如果您需要一种可靠的方法来读取Windows和Linux下的非阻塞,则此答案可能会有所帮助。


[1]来自 Python 2 文档:

读取行(限制=-1)

从流中读取并返回一行。如果指定了限制,则最多读取限制字节。

对于二进制文件,行终止符始终为 b'';对于文本文件,open() 的换行符参数可用于选择识别的行终止符。

[2]man 2 read

#include <unistd.h>
ssize_t read(int fd, void *buf, size_t count);

[3]示例

$ tree
.
├── prog.py
└── prog.sh

prog.sh

#!/usr/bin/env bash
for i in $(seq 3); do
echo "${RANDOM}"
sleep 1
done
sleep 3
echo "${RANDOM}"

prog.py

# talk_with_example_input.py
import subprocess
from select import epoll, EPOLLIN
def read_with_timeout(fd, timeout__s):
"""Reads from f until there is no new data for at least timeout__s seconds.
This only works on linux > 2.5.44.
"""
buf = []
e = epoll()
e.register(fd, EPOLLIN)
while True:
ret = e.poll(timeout__s)
if not ret or ret[0][1] is not EPOLLIN:
break
buf.append(
fd.read(1)
)
return ''.join(buf)
process = subprocess.Popen(
["./prog.sh"],
stdin = subprocess.PIPE,
stdout = subprocess.PIPE
)
print(read_with_timeout(process.stdout, 1.5))
print('-----')
print(read_with_timeout(process.stdout, 3))
$ python2 prog.py 
6194
14508
11293
-----
10506

最新更新