我正在尝试"连接";到多个对等端;过程";它们同时发生。对于单个进程(名为"任务"(来说,这很好。但是,对于多个任务,我希望在对等方处理完该进程后使用该进程。在以下设置中,我有3个任务,其中我希望在一个对等方完成第二个任务后,为所有其他对等方中止该任务。这应该包括create_new_conn_fut
未来的任何未来对等方。
use futures::stream::StreamExt;
use rand::Rng;
pub async fn process(peer: &str, duration: core::time::Duration, task_id: &str) {
// simulate processing by sleeping
tokio::time::sleep(duration).await;
println!("task #{} done for {}", task_id, peer);
}
#[tokio::main]
async fn main() {
let peers = vec!["peer A", "peer B", "peer C"];
let peers = futures::stream::iter(peers);
let (tx, rx) = tokio::sync::mpsc::channel(100);
let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
let rx = peers.chain(rx);
let handle_conn_fut = rx.for_each_concurrent(0,
|peer| async move {
let mut rng = rand::thread_rng();
println!("connecting to {}", peer);
process(peer, core::time::Duration::from_secs(1), "1").await;
process(peer, core::time::Duration::from_secs(rng.gen_range(5..15)), "2").await;
process(peer, core::time::Duration::from_secs(1), "3").await;
}
);
let create_new_conn_fut = async move {
for peer in ["peer D", "peer E"] {
tx.send(peer).await.unwrap();
}
};
// awaits all futures in parallell
futures::future::join(handle_conn_fut, create_new_conn_fut).await;
}
输出:
connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer C
task #3 done for peer C
task #2 done for peer D
task #2 done for peer A
task #2 done for peer B
task #3 done for peer D
task #3 done for peer A
task #3 done for peer B
task #2 done for peer E
task #3 done for peer E
我宁愿在对等方完成任务#2后,所有对等方都放弃它,并将所有未来的对等方重定向到只执行任务#1和任务#3。
为了说明这一点,我有以下
A B .... E
↓ ↓ ↓
async task #1 async task #1 async task #1
↓ ↓ ↓
async task #2 async task #2 async task #2
↓ ↓ ↓
async task #3 async task #3 async task #3
↓ ↓ ↓
done done done
一旦任何对等方(A-E(完成了async task #1
,我想将其短路。因此,假设ex.B首先完成了它,它看起来会是这样的:
A B .... E .... F (future peer)
↓ ↓ ↓ ↓
async task #1 async task #1 async task #1 async task #1
↓ ↓ ↓ ↓
↓ async task #2 ↓ ↓
↓ ↓ ↓ ↓
async task #3 async task #3 async task #3 async task #3
↓ ↓ ↓ ↓
done done done done
所以我想要的输出是:
connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer C <- will abort all other task #2
task #3 done for peer A
task #3 done for peer B
task #3 done for peer C
task #3 done for peer D
task #3 done for peer E
我已经研究过futures::future::AbortHandle
,但我认为这只是针对单个未来——既然futures::stream::AbortRegistration
没有克隆特性?
如何实现这样的东西?
一个想法是使用带有分支的tokio::select
来检查任务是否已完成或已取消:
use futures::stream::StreamExt;
use rand::Rng;
use futures::future;
pub async fn process(peer: &str, duration: core::time::Duration, task_id: &str) {
// simulate processing by sleeping
tokio::time::sleep(duration).await;
println!("task #{} done for {}", task_id, peer);
}
#[tokio::main]
async fn main() {
let peers = vec!["peer A", "peer B", "peer C"];
let peers = futures::stream::iter(peers);
let (tx, rx) = tokio::sync::mpsc::channel(100);
let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
let rx = peers.chain(rx);
let notify = std::sync::Arc::new(tokio::sync::Notify::new());
let done_with_task_2 = std::rc::Rc::new(std::cell::RefCell::new(false));
let handle_conn_fut = rx.for_each_concurrent(0, |peer| {
let notify = notify.clone();
let done_with_task_2 = done_with_task_2.clone();
async move {
let mut rng = rand::thread_rng();
println!("connecting to {}", peer);
process(peer, core::time::Duration::from_secs(1), "1").await;
// task #2
tokio::select! {
done = async {
if *done_with_task_2.borrow() {
future::ready(()).await
} else {
future::pending().await
}
} => {}
process = process(peer, core::time::Duration::from_secs(rng.gen_range(5..10)), "2") => {
notify.notify_waiters();
done_with_task_2.replace(true);
}
cancel = notify.notified() => {}
}
process(peer, core::time::Duration::from_secs(1), "3").await;
process(peer, core::time::Duration::from_secs(20), "4").await;
}
}
);
let create_new_conn_fut = async move {
for peer in ["peer D", "peer E"] {
tx.send(peer).await.unwrap();
}
// a new peer after 15 seconds
tokio::time::sleep(core::time::Duration::from_secs(15)).await;
tx.send("peer F").await.unwrap()
};
// awaits all futures in parallell
futures::future::join(handle_conn_fut, create_new_conn_fut).await;
}
输出:
connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer B
task #3 done for peer B
task #3 done for peer E
task #3 done for peer D
task #3 done for peer C
task #3 done for peer A
connecting to peer F
task #1 done for peer F
task #3 done for peer F
task #4 done for peer B
task #4 done for peer E
task #4 done for peer D
task #4 done for peer C
task #4 done for peer A
task #4 done for peer F
但是,代码变得有些冗长。有更好的写法吗?
可能使用futures::future::Either
或使用shared
。