如何在不阻止父任务的情况下,在另一个任务中生成长时间运行的Tokio任务



我正在尝试构建一个对象,该对象可以管理来自websocket的提要,但可以在多个提要之间切换。

有一个Feed特征:

trait Feed {
async fn start(&mut self);
async fn stop(&mut self);
}

实现Feed的结构有三种:ABC

当调用start时,它会启动一个无限循环,侦听来自websocket的消息,并在消息传入时处理每个消息

我想实现一个FeedManager,它维护一个活动提要,但可以接收切换使用的提要源的命令。

enum FeedCommand {
Start(String),
Stop,
}
struct FeedManager {
active_feed_handle: tokio::task::JoinHandle,
controller: mpsc::Receiver<FeedCommand>,
}
impl FeedManager {
async fn start(&self) {
while let Some(command) = self.controller.recv().await {
match command {
FeedCommand::Start(feed_type) => {
// somehow tell the active feed to stop (need channel probably) or kill the task?
if feed_type == "A" {
// replace active feed task with a new tokio task for consuming feed A
} else if feed_type == "B" {
// replace active feed task with a new tokio task for consuming feed B
} else {
// replace active feed task with a new tokio task for consuming feed C
}
}
}
}
}
}

我很难理解如何正确管理Tokio的所有任务。FeedManager的核心循环是永远侦听传入的新命令,但它需要能够在不阻塞的情况下生成另一个长期任务(这样它就可以侦听命令(。

我的第一次尝试是:

if feed_type == "A" {
self.active_feed_handle = tokio::spawn(async {
A::new().start().await;
});
self.active_feed_handle.await
}
  • 句柄上的.await会导致核心循环不再接受命令,对吧
  • 我可以省略最后一个.await,让任务仍然运行吗
  • 我需要以某种方式清理当前活动的任务吗

您可以通过生成一个任务来生成一个长时间运行的Tokio任务,而不会阻塞父任务——这是任务存在的主要原因。如果你不.await任务,那么你就不会等待任务:

use std::time::Duration;
use tokio::{task, time}; // 1.3.0
#[tokio::main]
async fn main() {
task::spawn(async {
time::sleep(Duration::from_secs(100)).await;
eprintln!(
"You'll likely never see this printed 
out because the parent task has exited 
and so has the entire program"
);
});
}

另请参阅:

  • 异步任务中止后会发生什么

实现这一点的一种方法是使用Tokio的join!()宏,该宏接受多个期货并等待所有期货。您可以创建多个期货,并将它们join!()放在一起,共同等待它们。

最新更新