子进程的异步双向IO重定向



我正试图找出一种通用的方法来实现子进程的异步双向IO重定向。基本上,我想生成一个交互式子进程,等待输入,任何输出都应该被读回。我尝试通过生成一个新的python进程来试验python.subprocess。试图实现的一个基本的简单化示例如下

process = subprocess.Popen(['/usr/bin/python'],shell=False,stdin=subprocess.PIPE, stdout=subprocess.PIPE)
while True:
    output = process.stdout.readline()
    print output
    input = sys.stdin.readline()
    process.stdin.write(input)

并且执行上面的代码片段只是挂起而没有任何输出。我试着用/usr/bash/usr/bin/irb运行,但结果都一样。我的猜测是,缓冲IO与IO重定向根本无法很好地结合。

所以我的问题是,在不刷新缓冲区或退出子流程的情况下读取子流程的输出是否可行?

下面的帖子提到IPC套接字,但为此我必须更改子进程,这可能不可行。还有其他方法可以实现吗?

注意***我的最终目标是创建一个可以与远程web客户端交互的服务器REPL进程。虽然给出的例子是Python,但我的最终目标是用通用包装器包装所有可用的REPL。


在回答中的一些建议的帮助下,我想出了以下

#!/usr/bin/python
import subprocess, os, select
proc = subprocess.Popen(['/usr/bin/python'],shell=False,stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE)
for i in xrange(0,5):
    inputready, outputready, exceptready = select.select([proc.stdout, proc.stderr],[proc.stdout, proc.stderr],[proc.stdout, proc.stderr],0)
    if not inputready: print "No Data",
    print inputready, outputready, exceptready
    for s in inputready: print s.fileno(),s.readline()
proc.terminate()
print "After Terminating"
for i in xrange(0,5):
    inputready, outputready, exceptready = select.select([proc.stdout, proc.stderr],[proc.stdout, proc.stderr],[proc.stdout, proc.stderr],0)
    if not inputready: print "No Data",
    print inputready, outputready, exceptready
    for s in inputready: print s.fileno(),s.readline() 

现在,虽然程序没有死锁,但不幸的是没有输出。运行上面的代码我得到

No Data [] [] []
No Data [] [] []
No Data [] [] []
No Data [] [] []
No Data [] [] []
After Terminating
No Data [] [] []
No Data [] [] []
No Data [] [] []
No Data [] [] []
No Data [] [] []

仅供参考,运行python作为

/usr/bin/python 2>&1|tee test.out

似乎工作得很好。

我还想出了一个"C"代码。但结果并没有什么不同。

int kbhit() {
    struct timeval tv;
    fd_set fds;
    tv.tv_sec = tv.tv_usec = 0;
    FD_ZERO(&fds);
    FD_SET(STDIN_FILENO, &fds);
    select(STDIN_FILENO+1, &fds, NULL, NULL, &tv);
    return FD_ISSET(STDIN_FILENO, &fds);
}
void receive(char *str) {
    char ch;
    fprintf(stderr,"IN1n");
    if(!kbhit()) return;
    fprintf(stderr,"IN2n");
    fprintf(stderr,"%dn",kbhit());
    for(;kbhit() && (ch=fgetc(stdin))!=EOF;) {
        fprintf(stderr,"%c,%d",ch,kbhit());
    }
    fprintf(stderr,"Donen");
}
int main(){
    pid_t pid;
    int rv, pipeP2C[2],pipeC2P[2];  
    pipe(pipeP2C);
    pipe(pipeC2P);
    pid=fork();
    if(pid){
        dup2(pipeP2C[1],1); /* Replace stdout with out side of the pipe */
        close(pipeP2C[0]);  /* Close unused side of pipe (in side) */
        dup2(pipeC2P[0],0); /* Replace stdin with in side of the pipe */
        close(pipeC2P[1]);  /* Close unused side of pipe (out side) */
        setvbuf(stdout,(char*)NULL,_IONBF,0);   /* Set non-buffered output on stdout */
        sleep(2);
        receive("quit()n");
        wait(&rv);              /* Wait for child process to end */
        fprintf(stderr,"Child exited with a %d valuen",rv);
    }
    else{
        dup2(pipeP2C[0],0); /* Replace stdin with the in side of the pipe */
        close(pipeP2C[1]);  /* Close unused side of pipe (out side) */
        dup2(pipeC2P[1],1); /* Replace stdout with the out side of the pipe */
        close(pipeC2P[0]);  /* Close unused side of pipe (out side) */
        setvbuf(stdout,(char*)NULL,_IONBF,0);   /* Set non-buffered output on stdout */
        close(2), dup2(1,2); /*Redirect stderr to stdout */
        if(execl("/usr/bin/python","/usr/bin/python",NULL) == -1){
            fprintf(stderr,"execl Error!");
            exit(1);
        }
    }
    return 0;
}

在您发布的Python代码中,您没有使用正确的流:

inputready, outputready, exceptready = select.select(
    [proc.stdout, proc.stderr], # read list
    [proc.stdout, proc.stderr], # write list
    [proc.stdout, proc.stderr], # error list.
    0)                          # time out.

我还没有尝试修复它,但我敢打赌,对同一组流的阅读和写作是不正确的。


您的样本中有多处出错。首先,您作为子进程启动的python可执行文件不会产生任何输出。第二个是存在竞争条件,因为在子进程产生输出之前,您可以连续调用select() 5次,在这种情况下,您将在读取任何内容之前终止进程。

我修复了上面提到的三个问题(写列表,启动一个产生输出和竞赛条件的进程)。试试这个样品,看看它是否适合你:

#!/usr/bin/python
import subprocess, os, select, time
path = "/usr/bin/python"
proc = subprocess.Popen([path, "foo.py"], shell=False,
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE)
for i in xrange(0,5):
    time.sleep(1)
    inputready, outputready, exceptready = select.select(
        [proc.stdout, proc.stderr], [proc.stdin,],
        [proc.stdout, proc.stderr, proc.stdin], 0)
    if not inputready:
        print "No Data",
    print inputready, outputready, exceptready
    for s in inputready:
        print s.fileno(),s.readline()
proc.terminate()
print "After Terminating"
for i in xrange(0,5):
    inputready, outputready, exceptready = select.select(
        [proc.stdout, proc.stderr], [proc.stdin,],
        [proc.stdout, proc.stderr, proc.stdin], 0)
    if not inputready:
        print "No Data",
    print inputready, outputready, exceptready
    for s in inputready:
        print s.fileno(),s.readline()

我使用的foo.py文件包含以下内容:

#!/usr/bin/python
print "Hello, world!"

以下版本(大部分删除了冗余输出,使结果更易于阅读):

#!/usr/bin/python
import subprocess, os, select, time
path = "/usr/bin/python"
proc = subprocess.Popen([path, "foo.py"], shell=False,
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE)
for i in xrange(0,5):
    time.sleep(1)
    inputready, outputready, exceptready = select.select(
        [proc.stdout, proc.stderr], [proc.stdin,],
        [proc.stdout, proc.stderr, proc.stdin], 0)
    for s in inputready:
        line = s.readline()
        if line:
            print s.fileno(), line
proc.terminate()
print "After Terminating"
for i in xrange(0,5):
    time.sleep(1)
    inputready, outputready, exceptready = select.select(
        [proc.stdout, proc.stderr], [proc.stdin,],
        [proc.stdout, proc.stderr, proc.stdin], 0)
    for s in inputready:
        line = s.readline()
        if line:
            print s.fileno(), line

给出以下输出:

5你好,世界!

终止后

请注意,由于某些原因,在select.select()中使用timeout参数并不能在我的系统上产生预期的结果,因此我改用time.sleep()


仅供参考,运行python作为

/usr/bin/python 2>&1|tee test.out

似乎工作得很好。

您无法获得这种效果,因为此示例仍然为python解释器提供了一个控制tty。如果没有控制tty,python解释器就不会打印python版本,也不会显示>>>提示。

下面是一个很好的例子。您可以将/dev/null替换为包含要发送到解释器的命令的文件。

/usr/bin/python </dev/null 2>&1|tee test.out

如果您重定向除控制tty(键盘)之外的任何作为进程的标准输入,您将不会从python解释器获得任何输出。这就是为什么您的代码似乎不起作用。

有不同的方法。例如,您可以:

  • 使用SysV消息队列,并在队列上轮询消息到达的超时时间
  • 为孩子创建一个pipe(),为父亲创建一个pipe()-都使用O_NONBLOCK标志,然后在要到达的数据的文件描述符上选择()(如果没有数据到达,甚至可以处理超时)
  • 使用socket()AF_UNIX或AF_INET,将其设置为非阻塞,然后为要到达的数据选择()或epoll()
  • mmap()MAP_SHARED内存段,并在数据到达时向其他进程发出信号,请注意带有锁定机制的共享段

我用C写了一个带有双管的样本:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <sys/select.h>
#include <fcntl.h>
#include <signal.h>
#define BUFLEN (6*1024)
#define EXECFILE "/usr/bin/python"
char *itoa(int n, char *s, int b) {
        static char digits[] = "0123456789abcdefghijklmnopqrstuvwxyz";
        int i=0, sign;
        if ((sign = n) < 0)
                n = -n;
        do {
                s[i++] = digits[n % b];
        } while ((n /= b) > 0);
        if (sign < 0)
                s[i++] = '-';
        s[i] = '';
        return s;
}
/*
int set_nonblock(int sockfd) { // set socket to non blocking
        int arg,i;
        if ((arg=fcntl(sockfd, F_GETFL, NULL)) < 0) {
                printf("error getting socket flag for fd %i: fcntl(..., F_GETFL): %in", sockfd, errno);
                return -1;
        }
        // set O_NONBLOCK flag
        arg |= O_NONBLOCK;
        if ((i=fcntl(sockfd, F_SETFL, arg)) < 0) {
                printf("error setting socket flag for fd %i: fcntl(..., F_SETFL): %in", sockfd, errno);
                return -1;
        }
        return i;
}
int set_block(int sockfd) { // set socket to blocking
        int arg,i;
        if ((arg=fcntl(sockfd, F_GETFL, NULL)) < 0) {
                printf("error getting socket flag for fd %i: fcntl(..., F_GETFL): %in", sockfd, errno);
                return -1;
        }
        // clean O_NONBLOCK flag
        arg &= (~O_NONBLOCK);
        if ((i=fcntl(sockfd, F_SETFL, arg)) < 0) {
                printf("error setting socket flag for fd %i: fcntl(..., F_SETFL): %in", sockfd, errno);
                return -1;
        }
        return i;
}
*/
int main() {
        FILE *input;
        char slice[BUFLEN];
        int status = 0;
        pid_t pid;
        int err;
        int newfd;
        // if you want you can pass arguments to the program to execute
        // char *const arguments[] = {EXECFILE, "-v", NULL};
        char *const arguments[] = {EXECFILE,  NULL};
        int father2child_pipefd[2];
        int child2father_pipefd[2];
        char *read_data = NULL;
        FILE *retclam;
        fd_set myset;
        int x=1;
        signal(SIGPIPE, SIG_IGN);
        newfd = dup(0);
        input = fdopen(newfd, "r");
        pipe(father2child_pipefd); // Father speaking to child
        pipe(child2father_pipefd); // Child speaking to father
        pid = fork();
        if (pid > 0) { // Father
                close(father2child_pipefd[0]);
                close(child2father_pipefd[1]);
                // Write to the pipe reading from stdin
                retclam = fdopen(child2father_pipefd[0], "r");

                // set the two fd non blocking
                //set_nonblock(0);
                //set_nonblock(child2father_pipefd[0]);
                //set_nonblock(fileno(retclam));
                while(x==1) {
                        // clear the file descriptor set
                        FD_ZERO(&myset);
                        // add the stdin to the set
                        FD_SET(fileno(input), &myset);
                        // add the child pipe to the set
                        FD_SET(fileno(retclam), &myset);
                        // here we wait for data to arrive from stdin or from the child pipe. The last argument is a timeout, if you like
                        err = select(fileno(retclam)+1, &myset, NULL, NULL, NULL);
                        switch(err) {
                        case -1:
                                // Problem with select(). The errno variable knows why
                                //exit(1);
                                x=0;
                                break;
                        case 0:
                                // timeout on select(). Data did not arrived in time, only valid if the last attribute of select() was specified
                                break;
                        default:
                                // data is ready to be read
                                bzero(slice, BUFLEN);
                                if (FD_ISSET(fileno(retclam), &myset)) { // data ready on the child
                                        //set_block(fileno(retclam));
                                        read_data = fgets(slice, BUFLEN, retclam); // read a line from the child (max BUFLEN bytes)
                                        //set_nonblock(fileno(retclam));
                                        if (read_data == NULL) {
                                                //exit(0);
                                                x=0;
                                                break;
                                        }
                                        // write data back to stdout
                                        write (1, slice, strlen(slice));
                                        if(feof(retclam)) {
                                                //exit(0);
                                                x=0;
                                                break;
                                        }
                                        break;
                                }
                                bzero(slice, BUFLEN);
                                if (FD_ISSET(fileno(input), &myset)) { // data ready on stdin
                                        //printf("fathern");
                                        //set_block(fileno(input));
                                        read_data = fgets(slice, BUFLEN, input); // read a line from stdin (max BUFLEN bytes)
                                        //set_nonblock(fileno(input));
                                        if (read_data == NULL) {
                                                //exit (0);
                                                close(father2child_pipefd[1]);
                                                waitpid(pid, &status, 0);
                                                //fclose(input);
                                                break;
                                        }
                                        // write data to the child
                                        write (father2child_pipefd[1], slice, strlen(slice));
                                        /*
                                        if(feof(input)) {
                                                exit(0);
                                        }*/
                                        break;
                                }
                        }
                }
                close(father2child_pipefd[1]);
                fclose(input);
                fsync(1);
                waitpid(pid, &status, 0);
                // child process terminated
                fclose (retclam);
                // Parse output data from child
                // write (1, "you can append somethind else on stdout if you like");
                if (WEXITSTATUS(status) == 0) {
                        exit (0); // child process exited successfully
                }
        }
        if (pid == 0) { // Child
                close (0); // stdin is not needed
                close (1); // stdout is not needed
                // Close the write side of this pipe
                close(father2child_pipefd[1]);
                // Close the read side of this pipe
                close(child2father_pipefd[0]);
                // Let's read on stdin, but this stdin is associated to the read pipe
                dup2(father2child_pipefd[0], 0);
                // Let's speak on stdout, but this stdout is associated to the write pipe
                dup2(child2father_pipefd[1], 1);
                // if you like you can put something back to the father before execve
                //write (child2father_pipefd[1], "something", 9);
                //fsync(child2father_pipefd[1]);
                err = execve(EXECFILE, arguments, NULL);
                // we'll never be here again after execve succeeded!! So we get here only if the execve() failed
                //fprintf(stderr, "Problem executing file %s: %i: %sn", EXECFILE, err, strerror(errno));
                exit (1);
        }
        if (pid < 0) { // Error
                exit (1);
        }
        fclose(input);
        return 0;
}

我在bash中使用双向io,如下所示:

mkfifo hotleg
mkfifo coldleg
program <coldleg |tee hotleg &
while read LINE; do
 case $LINE in
  *)call_a_function $LINE;;
 esac
done <hotleg |tee coldleg &

(注意,您可以只使用">"而不是tee,但您可能希望首先看到输出)

您认为缓冲I/O是罪魁祸首的猜测很可能是正确的。按照您编写循环的方式,读取将被阻塞,直到它填满所需的缓冲区,并且在它返回之前,您将无法处理任何输入。这很容易导致死锁。

Popen.communiate通过让一个线程处理每个管道,并确保它有所有要写入stdin的数据,这样在文件对象等待缓冲区填充或刷新/关闭文件对象时,实际写入就不会延迟。我认为,如果需要的话,可以让一个涉及线程的解决方案发挥作用,但这并不是真正的异步解决方案,也可能不是最简单的解决方案。

您可以通过不使用Popen提供的文件对象来访问管道,而是使用fileno()方法获取它们的fd来绕过python的缓冲。然后,您可以将fd与os.read、os.write和select.select一起使用。os.read和os.write函数将不进行缓冲,但它们将阻塞,直到至少可以读取/写入一个字节。在调用它们之前,您需要确保管道是可读写的。最简单的方法是使用select.select()等待所有要读/写的管道,并在select()返回时对每个准备好的管道进行一次读或写调用。如果进行搜索,您应该能够找到选择循环的示例(它们可能使用套接字而不是管道,但原理是一样的)。(此外,在不首先检查是否不会阻塞的情况下,千万不要进行读写操作,否则可能会导致子进程死锁。即使还没有编写出所需的所有内容,也必须准备好读取数据。)

如果您需要控制Python解释器会话,使用可能会更好

  • 将Python嵌入到程序中(如果它在Python中,则为普通evals),或者
  • 像PyScripter一样使用rpyc这样的RPC设施

顺便说一句,在后一种情况下,服务器可以在任何地方运行,PyScripter已经有了一个工作的服务器模块(客户端模块在Pascal中,需要翻译)。

最新更新