如何在启动异步操作和在循环中等待其结果之间有一个宽限期?



我有一个循环:

let grace = 2usize;
for i in 0..100 {
if i % 10 == 0 {
expensive_function()
} else {
cheap_function()
}
}

目标是当它命中expensive_function(),它异步运行并允许grace次数的进一步迭代,直到等待expensive_function()

如果expensive_function()在迭代 10 触发,则它可以运行迭代 11 和 12,然后需要等待迭代 10 上的expensive_function()运行完成才能继续。

我该怎么做?

就我而言,expensive_function()实际上是:

fn expensive_function(&b) -> Vec<_> {
return b.iter().map(|a| a.inner_expensive_function()).collect();
}

因此,我计划在此函数中使用多线程。

当您开始昂贵的计算时,将结果的未来以及等待结果的截止时间存储在变量中。在这里,我使用元组的Option

use std::{thread, time::Duration};
use tokio::task; // 0.2.21, features = ["full"]
#[tokio::main]
async fn main() {
let grace_period = 2usize;
let mut pending = None;
for i in 0..50 {
if i % 10 == 0 {
assert!(pending.is_none(), "Already had pending work");
let future = expensive_function(i);
let deadline = i + grace_period;
pending = Some((deadline, future));
} else {
cheap_function(i);
}
if let Some((deadline, future)) = pending.take() {
if i == deadline {
future.await.unwrap();
} else {
pending = Some((deadline, future));
}
}
}
}
fn expensive_function(n: usize) -> task::JoinHandle<()> {
task::spawn_blocking(move || {
println!("expensive_function {} start", n);
thread::sleep(Duration::from_millis(500));
println!("expensive_function {} done", n);
})
}
fn cheap_function(n: usize) {
println!("cheap_function {}", n);
thread::sleep(Duration::from_millis(1));
}

这将生成

cheap_function 1
expensive_function 0 start
cheap_function 2
expensive_function 0 done
cheap_function 3
cheap_function 4
cheap_function 5

由于您没有提供expensive_functioncheap_function的定义,我提供了适当的定义。

这里的一件棘手的事情是我需要在cheap_function中添加sleep调用。没有它,我的操作系统永远不会调度昂贵的线程,直到轮询它,有效地删除任何并行工作。在更大的程序中,操作系统可能会调度线程,仅仅是因为cheap_function将完成更多的工作。您也可以使用thread::yield_now来实现相同的效果。

另请参阅:

  • 如何在东京为CPU密集型工作创建专用线程池?
  • 如何在稳定的 Rust 中同步返回在异步 Future 中计算的值?
  • 在未来的 rs 中封装阻塞 I/O 的最佳方法是什么?

最新更新