我想实现一个futures::Stream
,用于读取和解析子进程的标准输出。
我现在在做什么:
-
生成子进程并通过
std::process
方法获取其标准输出:let child = Command::new(...).stdout(Stdio.pipe()).spawn().expect(...)
-
将
AsyncRead
和BufRead
添加到标准输出:let stdout = BufReader::new(tokio_io::io::AllowStdIo::new( child.stdout.expect("Failed to open stdout"), ));
-
为 stdout 声明一个包装结构:
struct MyStream<Io: AsyncRead + BufRead> { io: Io, }
-
实施
Stream
:impl<Io: AsyncRead + BufRead> Stream for MyStream<Io> { type Item = Message; type Error = Error; fn poll(&mut self) -> Poll<Option<Message>, Error> { let mut line = String::new(); let n = try_nb!(self.io.read_line(&mut line)); if n == 0 { return Ok(None.into()); } //...read & parse further } }
问题在于,AllowStdIo
不会使ChildStdout
神奇地异步,并且self.io.read_line
调用仍然阻塞。
我想我需要传递不同的东西而不是Stdio::pipe()
让它异步,但是呢?还是有不同的解决方案?
这个问题不同于 在将来的 rs 中封装阻塞 I/O 的最佳方法是什么?因为我想为子进程的特定情况获取异步 I/O,而不是解决同步 I/O 的封装问题。
更新:我正在使用tokio = "0.1.3"
来利用其运行时功能,目前无法使用tokio-process
(https://github.com/alexcrichton/tokio-process/issues/27)
tokio-process
crate 为您提供了一个CommandExt
特征,允许您异步生成命令。
生成的Child
具有用于ChildStdout
的吸气剂,该吸气剂可实现Read
并且是非阻塞的。
像您在示例中所做的那样将tokio_process::ChildStdout
包装到AllowStdIo
中应该可以使其正常工作!
2023 更新
tokio-process::CommandExt
已被弃用,取而代之的是tokio::process::Command
您可以以相对相似的方式使用。
这是我使用tokio的版本::p rocess
let mut child = match Command::new(&args.run[0])
.args(parameters)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()
{
Ok(c) => c,
Err(e) => panic!("Unable to start process `{}`. {}", args.run[0], e),
};
let stdout = child.stdout.take().expect("child did not have a handle to stdout");
let stderr = child.stderr.take().expect("child did not have a handle to stderr");
let mut stdout_reader = BufReader::new(stdout).lines();
let mut stderr_reader = BufReader::new(stderr).lines();
loop {
tokio::select! {
result = stdout_reader.next_line() => {
match result {
Ok(Some(line)) => println!("Stdout: {}", line),
Err(_) => break,
_ => (),
}
}
result = stderr_reader.next_line() => {
match result {
Ok(Some(line)) => println!("Stderr: {}", line),
Err(_) => break,
_ => (),
}
}
result = child.wait() => {
match result {
Ok(exit_code) => println!("Child process exited with {}", exit_code),
_ => (),
}
break // child process exited
}
};
}