为什么 tokio::spawn 在 crossbeam_channel::select 旁边调用时会有延迟?



我正在创建一个将生成其他任务的任务。其中一些需要一些时间,因此无法等待,但它们可以并行运行:

src/main.rs

use crossbeam::crossbeam_channel::{bounded, select};
#[tokio::main]
async fn main() {
let (s, r) = bounded::<usize>(1);
tokio::spawn(async move {
let mut counter = 0;
loop {
let loop_id = counter.clone();
tokio::spawn(async move { // why this one was not fired?
println!("inner task {}", loop_id);
}); // .await.unwrap(); - solves issue, but this is long task which cannot be awaited
println!("loop {}", loop_id);
select! {
recv(r) -> rr => {
// match rr {
//     Ok(ee) => {
//         println!("received from channel {}", loop_id);
//         tokio::spawn(async move {
//             println!("received from channel task {}", loop_id);
//         });
//     },
//     Err(e) => println!("{}", e),
// };
},
// more recv(some_channel) -> 
}
counter = counter + 1;
}
});
// let s_clone = s.clone();
// tokio::spawn(async move {
//     s_clone.send(2).unwrap();
// });
loop {
// rest of the program
}
}

我注意到奇怪的行为。这输出:

loop 0

我期待它也能输出inner task 0.

如果我向通道发送一个值,输出将是:

loop 0
inner task 0
loop 1

这是缺少inner task 1.

为什么inner task会以一个延迟循环生成?

我第一次注意到"从通道任务接收"的这种行为延迟了一个循环,但是当我减少代码以准备示例时,"内部任务"开始发生这种情况。值得一提的是,如果我写第二个tokio::spawn右,只有最后一个会有这个问题。打电话给tokio::spawnselect!时有什么我应该注意的吗?是什么导致了这一个延迟循环?

Cargo.toml 依赖项

[dependencies]
tokio = { version = "0.2", features = ["full"] }
crossbeam = "0.7"

锈 1.46, 视窗 10

select!正在阻塞,tokio::spawn的文档说:

生成的任务可以在当前线程上执行,也可以发送到其他线程执行。

在这种情况下,select!"future"实际上是一个阻塞函数,spawn不使用新线程(无论是在第一次调用中还是在循环内)。 因为你没有告诉tokio你要阻塞,tokio认为不需要另一个线程(从tokio的角度来看,你只有3个期货不应该阻塞,所以你为什么还需要另一个线程呢?

解决方案是将tokio::task::spawn_blocking用于select!-ing闭包(这将不再是未来,因此现在async move {}move || {})。 现在 tokio 将知道这个函数实际上是阻塞的,并将其移动到另一个线程(同时将所有实际的期货保留在其他执行线程中)。

use crossbeam::crossbeam_channel::{bounded, select};
#[tokio::main]
async fn main() {
let (s, r) = bounded::<usize>(1);
tokio::task::spawn_blocking(move || {
// ...
});
loop {
// rest of the program
}
}

链接到游乐场

另一种可能的解决方案是使用像tokio::sync::mpsc这样的非阻塞通道,您可以在其上使用await并获得预期的行为,例如这个带有直接recv().awaittokio::select!的游乐场示例,如下所示:

use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (mut s, mut r) = mpsc::channel::<usize>(1);
tokio::spawn(async move {
loop {
// ...
tokio::select! {
Some(i) = r.recv() => {
println!("got = {}", i);
}
}
}
});
loop {
// rest of the program
}
}

链接到游乐场

最新更新