如何异步读取子进程输出



我想实现一个futures::Stream,用于读取和解析子进程的标准输出。

我现在在做什么:

  • 生成子进程并通过std::process方法获取其标准输出:let child = Command::new(...).stdout(Stdio.pipe()).spawn().expect(...)

  • AsyncReadBufRead添加到标准输出:

    let stdout = BufReader::new(tokio_io::io::AllowStdIo::new(
    child.stdout.expect("Failed to open stdout"),
    ));
    
  • 为 stdout 声明一个包装结构:

    struct MyStream<Io: AsyncRead + BufRead> {
    io: Io,
    }
    
  • 实施Stream

    impl<Io: AsyncRead + BufRead> Stream for MyStream<Io> {
    type Item = Message;
    type Error = Error;
    fn poll(&mut self) -> Poll<Option<Message>, Error> {
    let mut line = String::new();
    let n = try_nb!(self.io.read_line(&mut line));
    if n == 0 {
    return Ok(None.into());
    }
    //...read & parse further
    }
    }
    

问题在于,AllowStdIo不会使ChildStdout神奇地异步,并且self.io.read_line调用仍然阻塞。

我想我需要传递不同的东西而不是Stdio::pipe()让它异步,但是呢?还是有不同的解决方案?

这个问题不同于 在将来的 rs 中封装阻塞 I/O 的最佳方法是什么?因为我想为子进程的特定情况获取异步 I/O,而不是解决同步 I/O 的封装问题。

更新:我正在使用tokio = "0.1.3"来利用其运行时功能,目前无法使用tokio-process(https://github.com/alexcrichton/tokio-process/issues/27)

tokio-processcrate 为您提供了一个CommandExt特征,允许您异步生成命令。

生成的Child具有用于ChildStdout的吸气剂,该吸气剂可实现Read并且是非阻塞的。

像您在示例中所做的那样将tokio_process::ChildStdout包装到AllowStdIo中应该可以使其正常工作!

2023 更新

tokio-process::CommandExt已被弃用,取而代之的是tokio::process::Command您可以以相对相似的方式使用。

这是我使用tokio的版本::p rocess

let mut child = match Command::new(&args.run[0])
.args(parameters)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn() 
{
Ok(c) => c,
Err(e) => panic!("Unable to start process `{}`. {}", args.run[0], e),
};
let stdout = child.stdout.take().expect("child did not have a handle to stdout");
let stderr = child.stderr.take().expect("child did not have a handle to stderr");
let mut stdout_reader = BufReader::new(stdout).lines();
let mut stderr_reader = BufReader::new(stderr).lines();
loop {
tokio::select! {
result = stdout_reader.next_line() => {
match result {
Ok(Some(line)) => println!("Stdout: {}", line),
Err(_) => break,
_ => (),
}
}
result = stderr_reader.next_line() => {
match result {
Ok(Some(line)) => println!("Stderr: {}", line),
Err(_) => break,
_ => (),
}
}
result = child.wait() => {
match result {
Ok(exit_code) => println!("Child process exited with {}", exit_code),
_ => (),
}
break // child process exited
}
};
}

最新更新