Async:如何在带有select(no_std环境)的循环中保持使用相同的future


我有两个异步函数:get_message和get_event。我想在消息到达或事件发生时执行一个操作,并在无限循环中永远执行。

简化的设置如下所示:

use futures::{future::select, future::Either, pin_mut};
impl MsgReceiver {
async fn get_message(&mut self) -> Message { /* ... */ }
}
impl EventListener {
async fn get_event(&mut self) -> Event { /* ... */ }
}
async fn eternal_task(receiver: MsgReceiver, listener: EventListener) -> ! {
let get_msg_fut = receiver.get_message();
pin_mut!(get_msg_fut);
loop {
let get_event_fut = listener.get_event();
pin_mut!(get_event_fut);
match select(get_event_fut, get_msg_fut).await {
Either::Left((ev, r_get_msg_fut)) => {
/* react to the event */
// r_get_msg_fut is not done, how to reuse it in the next iteration?
}
Either::Right((msg, r_get_event_fut)) => {
/* react to the message */
// it's fine to drop get_event_fut here
// the following line causes a double-mut-borrow error on receiver,
// despite receiver isn't borrowed anymore (the old future is completed and dropped)
let new_future = receiver.get_message();
}
};
}
}

我有三个主要问题:

  1. 当一个事件首先出现时,如何告诉rust我想在下一次循环迭代中重用不完整的get_message future
  2. 当信息首先出现时,如何在没有借用错误的情况下构建新的未来
  3. 当(2)被求解时,如何将新的未来放入相同的固定内存位置,并在下一次循环迭代中使用它

我认为这很难做到,即使使用可能需要的unsafe也很难做到。保持和重用相同的变量并不太难,事实上,第二个变量是最难的(至少在当前的借用检查器中是这样)。

我找到了一个解决方案,通过使用异步流机箱提供中介来完全绕过这个问题:

async fn eternal_task(mut receiver: MsgReceiver, mut listener: EventListener) -> ! {
let combined = futures::stream::select(
stream! { loop { yield Either::Left(receiver.get_message().await); } },
stream! { loop { yield Either::Right(listener.get_event().await); } },
);
pin_mut!(combined);
while let Some(msg_or_evt) = combined.next().await {
match msg_or_evt {
Either::Left(msg) => {
// do something with msg
}
Either::Right(evt) => {
// do something with evt
}
};
}
unreachable!()
}

它使用stream!宏生成一个连续调用.get_message().get_event()并生成值的类型。然后使用futures::stream::selectEither来组合它们。然后这只是一个循环结果的问题。它在#![no_std]中工作。

我成功地使用了它,但无法摆脱Box::pin

use futures::{future::select, future::Either, pin_mut};
use std::sync::Mutex;
#[derive(Debug)]
struct MsgReceiver;
#[derive(Debug)]
struct EventListener;
#[derive(Debug)]
struct Message;
#[derive(Debug)]
struct Event;
impl MsgReceiver {
async fn get_message(&mut self) -> Message {
Message
}
}
impl EventListener {
async fn get_event(&mut self) -> Event { 
Event
}
}
async fn eternal_task(receiver: MsgReceiver, mut listener: EventListener) -> ! {
let receiver = Mutex::new(receiver);
let mut f = None;
loop {
let get_msg_fut = match f.take() {
None => {
let mut l = receiver.lock();
Box::pin(async move {
l.get_message().await
})
}
Some(f) => f,
};
let get_event_fut = listener.get_event();
pin_mut!(get_event_fut);
match select(get_event_fut, get_msg_fut).await {
Either::Left((ev, r_get_msg_fut)) => {
/* react to the event */
// store the future for next iteration
f = Some(r_get_msg_fut);
}
Either::Right((msg, r_get_event_fut)) => {
/* react to the message */
}
};
}
}
#[tokio::main]
async fn main() {
eternal_task(MsgReceiver, EventListener).await;
}

Tokio有关于如何在select的循环中重用相同未来的文档:https://tokio.rs/tokio/tutorial/select#resuming-异步操作

TL;DR:

async fn action() {
// Some asynchronous logic
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);    

let operation = action();
tokio::pin!(operation);

loop {
tokio::select! {
_ = &mut operation => break,
Some(v) = rx.recv() => {
if v % 2 == 0 {
break;
}
}
}
}
}

最新更新