我正试图找出一种通用的方法来实现子进程的异步双向IO重定向。基本上,我想生成一个交互式子进程,等待输入,任何输出都应该被读回。我尝试通过生成一个新的python进程来试验python.subprocess。试图实现的一个基本的简单化示例如下
process = subprocess.Popen(['/usr/bin/python'],shell=False,stdin=subprocess.PIPE, stdout=subprocess.PIPE)
while True:
output = process.stdout.readline()
print output
input = sys.stdin.readline()
process.stdin.write(input)
并且执行上面的代码片段只是挂起而没有任何输出。我试着用/usr/bash
和/usr/bin/irb
运行,但结果都一样。我的猜测是,缓冲IO与IO重定向根本无法很好地结合。
所以我的问题是,在不刷新缓冲区或退出子流程的情况下读取子流程的输出是否可行?
下面的帖子提到IPC套接字,但为此我必须更改子进程,这可能不可行。还有其他方法可以实现吗?
注意***我的最终目标是创建一个可以与远程web客户端交互的服务器REPL进程。虽然给出的例子是Python,但我的最终目标是用通用包装器包装所有可用的REPL。
在回答中的一些建议的帮助下,我想出了以下
#!/usr/bin/python
import subprocess, os, select
proc = subprocess.Popen(['/usr/bin/python'],shell=False,stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE)
for i in xrange(0,5):
inputready, outputready, exceptready = select.select([proc.stdout, proc.stderr],[proc.stdout, proc.stderr],[proc.stdout, proc.stderr],0)
if not inputready: print "No Data",
print inputready, outputready, exceptready
for s in inputready: print s.fileno(),s.readline()
proc.terminate()
print "After Terminating"
for i in xrange(0,5):
inputready, outputready, exceptready = select.select([proc.stdout, proc.stderr],[proc.stdout, proc.stderr],[proc.stdout, proc.stderr],0)
if not inputready: print "No Data",
print inputready, outputready, exceptready
for s in inputready: print s.fileno(),s.readline()
现在,虽然程序没有死锁,但不幸的是没有输出。运行上面的代码我得到
No Data [] [] []
No Data [] [] []
No Data [] [] []
No Data [] [] []
No Data [] [] []
After Terminating
No Data [] [] []
No Data [] [] []
No Data [] [] []
No Data [] [] []
No Data [] [] []
仅供参考,运行python作为
/usr/bin/python 2>&1|tee test.out
似乎工作得很好。
我还想出了一个"C"代码。但结果并没有什么不同。
int kbhit() {
struct timeval tv;
fd_set fds;
tv.tv_sec = tv.tv_usec = 0;
FD_ZERO(&fds);
FD_SET(STDIN_FILENO, &fds);
select(STDIN_FILENO+1, &fds, NULL, NULL, &tv);
return FD_ISSET(STDIN_FILENO, &fds);
}
void receive(char *str) {
char ch;
fprintf(stderr,"IN1n");
if(!kbhit()) return;
fprintf(stderr,"IN2n");
fprintf(stderr,"%dn",kbhit());
for(;kbhit() && (ch=fgetc(stdin))!=EOF;) {
fprintf(stderr,"%c,%d",ch,kbhit());
}
fprintf(stderr,"Donen");
}
int main(){
pid_t pid;
int rv, pipeP2C[2],pipeC2P[2];
pipe(pipeP2C);
pipe(pipeC2P);
pid=fork();
if(pid){
dup2(pipeP2C[1],1); /* Replace stdout with out side of the pipe */
close(pipeP2C[0]); /* Close unused side of pipe (in side) */
dup2(pipeC2P[0],0); /* Replace stdin with in side of the pipe */
close(pipeC2P[1]); /* Close unused side of pipe (out side) */
setvbuf(stdout,(char*)NULL,_IONBF,0); /* Set non-buffered output on stdout */
sleep(2);
receive("quit()n");
wait(&rv); /* Wait for child process to end */
fprintf(stderr,"Child exited with a %d valuen",rv);
}
else{
dup2(pipeP2C[0],0); /* Replace stdin with the in side of the pipe */
close(pipeP2C[1]); /* Close unused side of pipe (out side) */
dup2(pipeC2P[1],1); /* Replace stdout with the out side of the pipe */
close(pipeC2P[0]); /* Close unused side of pipe (out side) */
setvbuf(stdout,(char*)NULL,_IONBF,0); /* Set non-buffered output on stdout */
close(2), dup2(1,2); /*Redirect stderr to stdout */
if(execl("/usr/bin/python","/usr/bin/python",NULL) == -1){
fprintf(stderr,"execl Error!");
exit(1);
}
}
return 0;
}
在您发布的Python代码中,您没有使用正确的流:
inputready, outputready, exceptready = select.select(
[proc.stdout, proc.stderr], # read list
[proc.stdout, proc.stderr], # write list
[proc.stdout, proc.stderr], # error list.
0) # time out.
我还没有尝试修复它,但我敢打赌,对同一组流的阅读和写作是不正确的。
您的样本中有多处出错。首先,您作为子进程启动的python可执行文件不会产生任何输出。第二个是存在竞争条件,因为在子进程产生输出之前,您可以连续调用select()
5次,在这种情况下,您将在读取任何内容之前终止进程。
我修复了上面提到的三个问题(写列表,启动一个产生输出和竞赛条件的进程)。试试这个样品,看看它是否适合你:
#!/usr/bin/python
import subprocess, os, select, time
path = "/usr/bin/python"
proc = subprocess.Popen([path, "foo.py"], shell=False,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
for i in xrange(0,5):
time.sleep(1)
inputready, outputready, exceptready = select.select(
[proc.stdout, proc.stderr], [proc.stdin,],
[proc.stdout, proc.stderr, proc.stdin], 0)
if not inputready:
print "No Data",
print inputready, outputready, exceptready
for s in inputready:
print s.fileno(),s.readline()
proc.terminate()
print "After Terminating"
for i in xrange(0,5):
inputready, outputready, exceptready = select.select(
[proc.stdout, proc.stderr], [proc.stdin,],
[proc.stdout, proc.stderr, proc.stdin], 0)
if not inputready:
print "No Data",
print inputready, outputready, exceptready
for s in inputready:
print s.fileno(),s.readline()
我使用的foo.py
文件包含以下内容:
#!/usr/bin/python
print "Hello, world!"
以下版本(大部分删除了冗余输出,使结果更易于阅读):
#!/usr/bin/python
import subprocess, os, select, time
path = "/usr/bin/python"
proc = subprocess.Popen([path, "foo.py"], shell=False,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
for i in xrange(0,5):
time.sleep(1)
inputready, outputready, exceptready = select.select(
[proc.stdout, proc.stderr], [proc.stdin,],
[proc.stdout, proc.stderr, proc.stdin], 0)
for s in inputready:
line = s.readline()
if line:
print s.fileno(), line
proc.terminate()
print "After Terminating"
for i in xrange(0,5):
time.sleep(1)
inputready, outputready, exceptready = select.select(
[proc.stdout, proc.stderr], [proc.stdin,],
[proc.stdout, proc.stderr, proc.stdin], 0)
for s in inputready:
line = s.readline()
if line:
print s.fileno(), line
给出以下输出:
5你好,世界!
终止后
请注意,由于某些原因,在select.select()
中使用timeout
参数并不能在我的系统上产生预期的结果,因此我改用time.sleep()
。
仅供参考,运行python作为
/usr/bin/python 2>&1|tee test.out
似乎工作得很好。
您无法获得这种效果,因为此示例仍然为python解释器提供了一个控制tty。如果没有控制tty,python解释器就不会打印python版本,也不会显示>>>
提示。
下面是一个很好的例子。您可以将/dev/null
替换为包含要发送到解释器的命令的文件。
/usr/bin/python </dev/null 2>&1|tee test.out
如果您重定向除控制tty(键盘)之外的任何作为进程的标准输入,您将不会从python解释器获得任何输出。这就是为什么您的代码似乎不起作用。
有不同的方法。例如,您可以:
- 使用SysV消息队列,并在队列上轮询消息到达的超时时间
- 为孩子创建一个pipe(),为父亲创建一个pipe()-都使用O_NONBLOCK标志,然后在要到达的数据的文件描述符上选择()(如果没有数据到达,甚至可以处理超时)
- 使用socket()AF_UNIX或AF_INET,将其设置为非阻塞,然后为要到达的数据选择()或epoll()
- mmap()MAP_SHARED内存段,并在数据到达时向其他进程发出信号,请注意带有锁定机制的共享段
我用C写了一个带有双管的样本:
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <sys/select.h>
#include <fcntl.h>
#include <signal.h>
#define BUFLEN (6*1024)
#define EXECFILE "/usr/bin/python"
char *itoa(int n, char *s, int b) {
static char digits[] = "0123456789abcdefghijklmnopqrstuvwxyz";
int i=0, sign;
if ((sign = n) < 0)
n = -n;
do {
s[i++] = digits[n % b];
} while ((n /= b) > 0);
if (sign < 0)
s[i++] = '-';
s[i] = ' ';
return s;
}
/*
int set_nonblock(int sockfd) { // set socket to non blocking
int arg,i;
if ((arg=fcntl(sockfd, F_GETFL, NULL)) < 0) {
printf("error getting socket flag for fd %i: fcntl(..., F_GETFL): %in", sockfd, errno);
return -1;
}
// set O_NONBLOCK flag
arg |= O_NONBLOCK;
if ((i=fcntl(sockfd, F_SETFL, arg)) < 0) {
printf("error setting socket flag for fd %i: fcntl(..., F_SETFL): %in", sockfd, errno);
return -1;
}
return i;
}
int set_block(int sockfd) { // set socket to blocking
int arg,i;
if ((arg=fcntl(sockfd, F_GETFL, NULL)) < 0) {
printf("error getting socket flag for fd %i: fcntl(..., F_GETFL): %in", sockfd, errno);
return -1;
}
// clean O_NONBLOCK flag
arg &= (~O_NONBLOCK);
if ((i=fcntl(sockfd, F_SETFL, arg)) < 0) {
printf("error setting socket flag for fd %i: fcntl(..., F_SETFL): %in", sockfd, errno);
return -1;
}
return i;
}
*/
int main() {
FILE *input;
char slice[BUFLEN];
int status = 0;
pid_t pid;
int err;
int newfd;
// if you want you can pass arguments to the program to execute
// char *const arguments[] = {EXECFILE, "-v", NULL};
char *const arguments[] = {EXECFILE, NULL};
int father2child_pipefd[2];
int child2father_pipefd[2];
char *read_data = NULL;
FILE *retclam;
fd_set myset;
int x=1;
signal(SIGPIPE, SIG_IGN);
newfd = dup(0);
input = fdopen(newfd, "r");
pipe(father2child_pipefd); // Father speaking to child
pipe(child2father_pipefd); // Child speaking to father
pid = fork();
if (pid > 0) { // Father
close(father2child_pipefd[0]);
close(child2father_pipefd[1]);
// Write to the pipe reading from stdin
retclam = fdopen(child2father_pipefd[0], "r");
// set the two fd non blocking
//set_nonblock(0);
//set_nonblock(child2father_pipefd[0]);
//set_nonblock(fileno(retclam));
while(x==1) {
// clear the file descriptor set
FD_ZERO(&myset);
// add the stdin to the set
FD_SET(fileno(input), &myset);
// add the child pipe to the set
FD_SET(fileno(retclam), &myset);
// here we wait for data to arrive from stdin or from the child pipe. The last argument is a timeout, if you like
err = select(fileno(retclam)+1, &myset, NULL, NULL, NULL);
switch(err) {
case -1:
// Problem with select(). The errno variable knows why
//exit(1);
x=0;
break;
case 0:
// timeout on select(). Data did not arrived in time, only valid if the last attribute of select() was specified
break;
default:
// data is ready to be read
bzero(slice, BUFLEN);
if (FD_ISSET(fileno(retclam), &myset)) { // data ready on the child
//set_block(fileno(retclam));
read_data = fgets(slice, BUFLEN, retclam); // read a line from the child (max BUFLEN bytes)
//set_nonblock(fileno(retclam));
if (read_data == NULL) {
//exit(0);
x=0;
break;
}
// write data back to stdout
write (1, slice, strlen(slice));
if(feof(retclam)) {
//exit(0);
x=0;
break;
}
break;
}
bzero(slice, BUFLEN);
if (FD_ISSET(fileno(input), &myset)) { // data ready on stdin
//printf("fathern");
//set_block(fileno(input));
read_data = fgets(slice, BUFLEN, input); // read a line from stdin (max BUFLEN bytes)
//set_nonblock(fileno(input));
if (read_data == NULL) {
//exit (0);
close(father2child_pipefd[1]);
waitpid(pid, &status, 0);
//fclose(input);
break;
}
// write data to the child
write (father2child_pipefd[1], slice, strlen(slice));
/*
if(feof(input)) {
exit(0);
}*/
break;
}
}
}
close(father2child_pipefd[1]);
fclose(input);
fsync(1);
waitpid(pid, &status, 0);
// child process terminated
fclose (retclam);
// Parse output data from child
// write (1, "you can append somethind else on stdout if you like");
if (WEXITSTATUS(status) == 0) {
exit (0); // child process exited successfully
}
}
if (pid == 0) { // Child
close (0); // stdin is not needed
close (1); // stdout is not needed
// Close the write side of this pipe
close(father2child_pipefd[1]);
// Close the read side of this pipe
close(child2father_pipefd[0]);
// Let's read on stdin, but this stdin is associated to the read pipe
dup2(father2child_pipefd[0], 0);
// Let's speak on stdout, but this stdout is associated to the write pipe
dup2(child2father_pipefd[1], 1);
// if you like you can put something back to the father before execve
//write (child2father_pipefd[1], "something", 9);
//fsync(child2father_pipefd[1]);
err = execve(EXECFILE, arguments, NULL);
// we'll never be here again after execve succeeded!! So we get here only if the execve() failed
//fprintf(stderr, "Problem executing file %s: %i: %sn", EXECFILE, err, strerror(errno));
exit (1);
}
if (pid < 0) { // Error
exit (1);
}
fclose(input);
return 0;
}
我在bash中使用双向io,如下所示:
mkfifo hotleg
mkfifo coldleg
program <coldleg |tee hotleg &
while read LINE; do
case $LINE in
*)call_a_function $LINE;;
esac
done <hotleg |tee coldleg &
(注意,您可以只使用">"而不是tee,但您可能希望首先看到输出)
您认为缓冲I/O是罪魁祸首的猜测很可能是正确的。按照您编写循环的方式,读取将被阻塞,直到它填满所需的缓冲区,并且在它返回之前,您将无法处理任何输入。这很容易导致死锁。
Popen.communiate通过让一个线程处理每个管道,并确保它有所有要写入stdin的数据,这样在文件对象等待缓冲区填充或刷新/关闭文件对象时,实际写入就不会延迟。我认为,如果需要的话,可以让一个涉及线程的解决方案发挥作用,但这并不是真正的异步解决方案,也可能不是最简单的解决方案。
您可以通过不使用Popen提供的文件对象来访问管道,而是使用fileno()方法获取它们的fd来绕过python的缓冲。然后,您可以将fd与os.read、os.write和select.select一起使用。os.read和os.write函数将不进行缓冲,但它们将阻塞,直到至少可以读取/写入一个字节。在调用它们之前,您需要确保管道是可读写的。最简单的方法是使用select.select()等待所有要读/写的管道,并在select()返回时对每个准备好的管道进行一次读或写调用。如果进行搜索,您应该能够找到选择循环的示例(它们可能使用套接字而不是管道,但原理是一样的)。(此外,在不首先检查是否不会阻塞的情况下,千万不要进行读写操作,否则可能会导致子进程死锁。即使还没有编写出所需的所有内容,也必须准备好读取数据。)
如果您需要控制Python解释器会话,使用可能会更好
- 将Python嵌入到程序中(如果它在Python中,则为普通
eval
s),或者 - 像PyScripter一样使用rpyc这样的RPC设施
顺便说一句,在后一种情况下,服务器可以在任何地方运行,PyScripter已经有了一个工作的服务器模块(客户端模块在Pascal中,需要翻译)。