我正在尝试构建一个对象,该对象可以管理来自websocket的提要,但可以在多个提要之间切换。
有一个Feed
特征:
trait Feed {
async fn start(&mut self);
async fn stop(&mut self);
}
实现Feed
的结构有三种:A
、B
和C
。
当调用start
时,它会启动一个无限循环,侦听来自websocket的消息,并在消息传入时处理每个消息
我想实现一个FeedManager
,它维护一个活动提要,但可以接收切换使用的提要源的命令。
enum FeedCommand {
Start(String),
Stop,
}
struct FeedManager {
active_feed_handle: tokio::task::JoinHandle,
controller: mpsc::Receiver<FeedCommand>,
}
impl FeedManager {
async fn start(&self) {
while let Some(command) = self.controller.recv().await {
match command {
FeedCommand::Start(feed_type) => {
// somehow tell the active feed to stop (need channel probably) or kill the task?
if feed_type == "A" {
// replace active feed task with a new tokio task for consuming feed A
} else if feed_type == "B" {
// replace active feed task with a new tokio task for consuming feed B
} else {
// replace active feed task with a new tokio task for consuming feed C
}
}
}
}
}
}
我很难理解如何正确管理Tokio的所有任务。FeedManager
的核心循环是永远侦听传入的新命令,但它需要能够在不阻塞的情况下生成另一个长期任务(这样它就可以侦听命令(。
我的第一次尝试是:
if feed_type == "A" {
self.active_feed_handle = tokio::spawn(async {
A::new().start().await;
});
self.active_feed_handle.await
}
- 句柄上的
.await
会导致核心循环不再接受命令,对吧 - 我可以省略最后一个
.await
,让任务仍然运行吗 - 我需要以某种方式清理当前活动的任务吗
您可以通过生成一个任务来生成一个长时间运行的Tokio任务,而不会阻塞父任务——这是任务存在的主要原因。如果你不.await
任务,那么你就不会等待任务:
use std::time::Duration;
use tokio::{task, time}; // 1.3.0
#[tokio::main]
async fn main() {
task::spawn(async {
time::sleep(Duration::from_secs(100)).await;
eprintln!(
"You'll likely never see this printed
out because the parent task has exited
and so has the entire program"
);
});
}
另请参阅:
- 异步任务中止后会发生什么
实现这一点的一种方法是使用Tokio的join!()
宏,该宏接受多个期货并等待所有期货。您可以创建多个期货,并将它们join!()
放在一起,共同等待它们。