当同时处理future::stream::stream中的元素时,中止中间futures



我正在尝试"连接";到多个对等端;过程";它们同时发生。对于单个进程(名为"任务"(来说,这很好。但是,对于多个任务,我希望在对等方处理完该进程后使用该进程。在以下设置中,我有3个任务,其中我希望在一个对等方完成第二个任务后,为所有其他对等方中止该任务。这应该包括create_new_conn_fut未来的任何未来对等方。

use futures::stream::StreamExt;
use rand::Rng;
pub async fn process(peer: &str, duration: core::time::Duration, task_id: &str) {
// simulate processing by sleeping
tokio::time::sleep(duration).await;
println!("task #{} done for {}", task_id, peer);
}
#[tokio::main]
async fn main() {
let peers = vec!["peer A", "peer B", "peer C"];
let peers = futures::stream::iter(peers);
let (tx, rx) = tokio::sync::mpsc::channel(100);
let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
let rx = peers.chain(rx);

let handle_conn_fut = rx.for_each_concurrent(0,
|peer| async move {
let mut rng = rand::thread_rng();
println!("connecting to {}", peer);
process(peer, core::time::Duration::from_secs(1), "1").await;
process(peer, core::time::Duration::from_secs(rng.gen_range(5..15)), "2").await;
process(peer, core::time::Duration::from_secs(1), "3").await;
}
);

let create_new_conn_fut = async move {
for peer in ["peer D", "peer E"] {
tx.send(peer).await.unwrap();
}
};
// awaits all futures in parallell
futures::future::join(handle_conn_fut, create_new_conn_fut).await;
}

输出:

connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer C
task #3 done for peer C
task #2 done for peer D
task #2 done for peer A
task #2 done for peer B
task #3 done for peer D
task #3 done for peer A
task #3 done for peer B
task #2 done for peer E
task #3 done for peer E

我宁愿在对等方完成任务#2后,所有对等方都放弃它,并将所有未来的对等方重定向到只执行任务#1和任务#3。

为了说明这一点,我有以下

A                B           ....          E
↓                ↓                         ↓
async task #1    async task #1             async task #1
↓                ↓                         ↓
async task #2    async task #2             async task #2
↓                ↓                         ↓
async task #3    async task #3             async task #3
↓                ↓                         ↓
done             done                      done

一旦任何对等方(A-E(完成了async task #1,我想将其短路。因此,假设ex.B首先完成了它,它看起来会是这样的:

A                B           ....          E          ....           F (future peer)
↓                ↓                         ↓                         ↓
async task #1    async task #1             async task #1             async task #1 
↓                ↓                         ↓                         ↓
↓           async task #2                  ↓                         ↓
↓                ↓                         ↓                         ↓
async task #3    async task #3             async task #3             async task #3
↓                ↓                         ↓                         ↓
done             done                      done                      done

所以我想要的输出是:

connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer C    <- will abort all other task #2
task #3 done for peer A
task #3 done for peer B
task #3 done for peer C
task #3 done for peer D
task #3 done for peer E

我已经研究过futures::future::AbortHandle,但我认为这只是针对单个未来——既然futures::stream::AbortRegistration没有克隆特性?

如何实现这样的东西?

一个想法是使用带有分支的tokio::select来检查任务是否已完成或已取消:

use futures::stream::StreamExt;
use rand::Rng;
use futures::future;
pub async fn process(peer: &str, duration: core::time::Duration, task_id: &str) {
// simulate processing by sleeping
tokio::time::sleep(duration).await;
println!("task #{} done for {}", task_id, peer);
}
#[tokio::main]
async fn main() {
let peers = vec!["peer A", "peer B", "peer C"];
let peers = futures::stream::iter(peers);
let (tx, rx) = tokio::sync::mpsc::channel(100);
let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
let rx = peers.chain(rx);
let notify = std::sync::Arc::new(tokio::sync::Notify::new());
let done_with_task_2 = std::rc::Rc::new(std::cell::RefCell::new(false));

let handle_conn_fut = rx.for_each_concurrent(0, |peer| {
let notify = notify.clone();
let done_with_task_2 = done_with_task_2.clone();
async move {
let mut rng = rand::thread_rng();
println!("connecting to {}", peer);
process(peer, core::time::Duration::from_secs(1), "1").await;
// task #2
tokio::select! {
done = async { 
if *done_with_task_2.borrow() {
future::ready(()).await
} else {
future::pending().await
}
} => {}
process = process(peer, core::time::Duration::from_secs(rng.gen_range(5..10)), "2") => {
notify.notify_waiters();
done_with_task_2.replace(true);
}
cancel = notify.notified() => {}
}
process(peer, core::time::Duration::from_secs(1), "3").await;
process(peer, core::time::Duration::from_secs(20), "4").await;
}
}
);

let create_new_conn_fut = async move {
for peer in ["peer D", "peer E"] {
tx.send(peer).await.unwrap();
}
// a new peer after 15 seconds
tokio::time::sleep(core::time::Duration::from_secs(15)).await;
tx.send("peer F").await.unwrap()
};
// awaits all futures in parallell
futures::future::join(handle_conn_fut, create_new_conn_fut).await;
}

输出:

connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer B
task #3 done for peer B
task #3 done for peer E
task #3 done for peer D
task #3 done for peer C
task #3 done for peer A
connecting to peer F
task #1 done for peer F
task #3 done for peer F
task #4 done for peer B
task #4 done for peer E
task #4 done for peer D
task #4 done for peer C
task #4 done for peer A
task #4 done for peer F

但是,代码变得有些冗长。有更好的写法吗?

可能使用futures::future::Either或使用shared

最新更新