我有以下设置:
use core::task::Poll;
use tokio::io::ReadBuf;
use core::task::Context;
use core::pin::Pin;
use std::error::Error;
use tokio::io::AsyncRead;
struct Dummy;
impl AsyncRead for Dummy {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<tokio::io::Result<()>> {
Poll::Pending
}
}
fn request_peers() -> impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>> {
futures::stream::iter((0..10).map(move |i| {
futures::future::ok(Dummy{})
}))
}
async fn connect (
peers: impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>>
) -> impl futures::stream::Stream<Item = impl tokio::io::AsyncRead> {
todo!()
}
#[tokio::main]
async fn main() {
let peers = request_peers();
let connected_peers = connect(peers).await;
}
操场上联系
我想通过等待未来连接所有peers
,忽略未连接的对等体。理想情况下,我希望将对等体保持在future::stream::Stream
中。我想下面的代码可能有用:
use core::task::Poll;
use tokio::io::ReadBuf;
use core::task::Context;
use core::pin::Pin;
use std::error::Error;
use tokio::io::AsyncRead;
struct Dummy;
impl AsyncRead for Dummy {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<tokio::io::Result<()>> {
Poll::Pending
}
}
fn request_peers() -> impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>> {
futures::stream::iter((0..10).map(move |i| {
println!("instantiated");
futures::future::ok(Dummy{})
}))
}
use futures::{StreamExt};
fn connect (
peers: impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>>
) -> impl futures::stream::Stream<Item = impl tokio::io::AsyncRead> {
peers.filter_map(|peer_fut| async move {
if let Ok(peer) = peer_fut.await {
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
println!("connected");
Some(peer)
} else {
None
}
})
}
#[tokio::main]
async fn main() {
let peers = request_peers();
let connected_peers = connect(peers);
connected_peers.for_each_concurrent(None, |peer| async {
println!("processed")
}).await;
}
操场上联系
但是对等体不是并发连接的,所以这将需要10秒来完成-而不是~1秒。
我注意到,如果我返回Vec
而不是future::stream::Stream
,它将使用以下代码片段并发地连接对等体:
use futures::{StreamExt};
async fn connect (
peers: impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>>
) -> Vec<impl tokio::io::AsyncRead> {
let mut peers = peers.map(|peer_fut| async move {
if let Ok(peer) = peer_fut.await {
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
println!("connected");
Some(peer)
} else {
None
}
})
.buffer_unordered(50)
.collect::<Vec<_>>().await;
peers.into_iter().flatten().collect()
}
#[tokio::main]
async fn main() {
let peers = request_peers();
let connected_peers = connect(peers).await;
futures::stream::iter(connected_peers).for_each_concurrent(None, |peer| async {
println!("processed")
}).await;
}
操场上联系
有没有办法做到这一点,而不转换为Vec
,而不是保持futures::stream::Stream
?
这听起来像是FuturesUnordered的一个很好的用例
你创建了许多future(即通过在Vec上运行map和collect),然后将它们转换成一个迭代器,该迭代器从哪个future先完成的异步生成结果。如果任何期货返回错误结果,可以跳过或适当处理。