让rust函数暂停执行并在中多次返回值,而不会丢失其堆栈



我正在处理一个计算时间很长的项目,我将有数百个节点运行它。作为实现的一部分,我有一个状态处理程序对象/结构,它与API对话并获得所需的信息,如参数,然后状态处理程序调用主要的密集型函数。

为了密切关注计算密集型函数,我希望它将完成百分比返回给状态处理程序函数,以便状态处理程序可以更新API,然后允许密集型函数继续计算,而不会丢失任何堆栈(如变量和文件句柄(

我研究过异步函数,但它们似乎只返回一次。

提前谢谢!

您想要的是生成器,它们目前不稳定。您可以在nightly上试用它们,也可以手动创建与它们类似的东西并手动调用它(尽管它的语法不如生成器好(。例如:

enum Status<T, K> {
Updated(T),
Finished(K)
}
struct State { /* ... */ }
impl State {
pub fn new() -> Self {
Self { /* ... */ }
}

pub fn call(self) -> Status<Self, ()> {
// Do some update on State and return
// State::Updated(self). When you finised
// return State::Finished(()). Note that this
// method consumes self. You could make it take
// &mut self, but then you would have to worry
// about how to prevent it beeing called after
// it finished. If you want to return some intermidiate
// value in each step you can make Status::Updated
// contain a (state, value) instead.

todo!()
}
}
fn foo() {
let mut state = State::new();

let result = loop {
match state.call() {
Status::Updated(s) => state = s,
Status::Finished(result) => break result
}
};
}

Async实际上可以暂停和恢复,但它适用于基本上一直等待一些外部IO的IO绑定程序。它不适用于计算量大的任务。

关于如何解决这个问题,我想到了两种方法:

  • 线程&通道
  • 回调

解决方案1:Threads&通道

use std::{sync::mpsc, thread, time::Duration};
struct StatusUpdate {
task_id: i32,
percent: f32,
}
impl StatusUpdate {
pub fn new(task_id: i32, percent: f32) -> Self {
Self { task_id, percent }
}
}
fn expensive_computation(id: i32, status_update: mpsc::Sender<StatusUpdate>) {
status_update.send(StatusUpdate::new(id, 0.0)).unwrap();
thread::sleep(Duration::from_millis(1000));
status_update.send(StatusUpdate::new(id, 33.3)).unwrap();
thread::sleep(Duration::from_millis(1000));
status_update.send(StatusUpdate::new(id, 66.6)).unwrap();
thread::sleep(Duration::from_millis(1000));
status_update.send(StatusUpdate::new(id, 100.0)).unwrap();
}
fn main() {
let (status_sender_1, status_receiver) = mpsc::channel();
let status_sender_2 = status_sender_1.clone();
thread::spawn(move || expensive_computation(1, status_sender_1));
thread::spawn(move || expensive_computation(2, status_sender_2));
for status_update in status_receiver {
println!(
"Task {} is done {} %",
status_update.task_id, status_update.percent
);
}
}
Task 1 is done 0 %
Task 2 is done 0 %
Task 1 is done 33.3 %
Task 2 is done 33.3 %
Task 1 is done 66.6 %
Task 2 is done 66.6 %
Task 2 is done 100 %
Task 1 is done 100 %

解决方案2:回调

use std::{thread, time::Duration};
struct StatusUpdate {
task_id: i32,
percent: f32,
}
impl StatusUpdate {
pub fn new(task_id: i32, percent: f32) -> Self {
Self { task_id, percent }
}
}
fn expensive_computation<F: FnMut(StatusUpdate)>(id: i32, mut update_status: F) {
update_status(StatusUpdate::new(id, 0.0));
thread::sleep(Duration::from_millis(1000));
update_status(StatusUpdate::new(id, 33.3));
thread::sleep(Duration::from_millis(1000));
update_status(StatusUpdate::new(id, 66.6));
thread::sleep(Duration::from_millis(1000));
update_status(StatusUpdate::new(id, 100.0));
}
fn main() {
expensive_computation(1, |status_update| {
println!(
"Task {} is done {} %",
status_update.task_id, status_update.percent
);
});
}
Task 1 is done 0 %
Task 1 is done 33.3 %
Task 1 is done 66.6 %
Task 1 is done 100 %

请注意,使用通道解决方案,一次处理不同线程上的多个计算要容易得多。使用回调,线程之间的通信很难/不可能。

在处理数据时,我是否可以暂停执行昂贵的函数,然后允许它恢复?

不,这不是线程可以完成的。一般来说

您可以以比主线程更低的优先级运行它们,这意味着它们不会被积极地调度,从而减少了主线程中的延迟。但总的来说,操作系统是抢占式的,能够在线程之间来回切换,所以你不应该担心"暂停"。

最新更新