将 'impl Stream<Item = Future<Output = IoResult<impl A>>>' 转换为 'impl Stream<I



我有以下设置:

use core::task::Poll;
use tokio::io::ReadBuf;
use core::task::Context;
use core::pin::Pin;
use std::error::Error;
use tokio::io::AsyncRead;
struct Dummy;
impl AsyncRead for Dummy {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<tokio::io::Result<()>> {
Poll::Pending
}
}
fn request_peers() -> impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>> {
futures::stream::iter((0..10).map(move |i| {
futures::future::ok(Dummy{})
}))
}
async fn connect (
peers: impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>>
) -> impl futures::stream::Stream<Item = impl tokio::io::AsyncRead> {
todo!()
}
#[tokio::main]
async fn main() {
let peers = request_peers();
let connected_peers = connect(peers).await;
}

操场上联系

我想通过等待未来连接所有peers,忽略未连接的对等体。理想情况下,我希望将对等体保持在future::stream::Stream中。我想下面的代码可能有用:

use core::task::Poll;
use tokio::io::ReadBuf;
use core::task::Context;
use core::pin::Pin;
use std::error::Error;
use tokio::io::AsyncRead;
struct Dummy;
impl AsyncRead for Dummy {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<tokio::io::Result<()>> {
Poll::Pending
}
}
fn request_peers() -> impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>> {
futures::stream::iter((0..10).map(move |i| {
println!("instantiated");
futures::future::ok(Dummy{})
}))
}
use futures::{StreamExt};
fn connect (
peers: impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>>
) -> impl futures::stream::Stream<Item = impl tokio::io::AsyncRead> {
peers.filter_map(|peer_fut| async move {
if let Ok(peer) = peer_fut.await {
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
println!("connected");
Some(peer)
} else {
None
}
})
}
#[tokio::main]
async fn main() {
let peers = request_peers();
let connected_peers = connect(peers);

connected_peers.for_each_concurrent(None, |peer| async {
println!("processed")
}).await;
}

操场上联系

但是对等体不是并发连接的,所以这将需要10秒来完成-而不是~1秒。

我注意到,如果我返回Vec而不是future::stream::Stream,它将使用以下代码片段并发地连接对等体:

use futures::{StreamExt};
async fn connect (
peers: impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>>
) -> Vec<impl tokio::io::AsyncRead> {
let mut peers = peers.map(|peer_fut| async move {
if let Ok(peer) = peer_fut.await {
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
println!("connected");
Some(peer)
} else {
None
}
})
.buffer_unordered(50)
.collect::<Vec<_>>().await;

peers.into_iter().flatten().collect()
}
#[tokio::main]
async fn main() {
let peers = request_peers();
let connected_peers = connect(peers).await;

futures::stream::iter(connected_peers).for_each_concurrent(None, |peer| async {
println!("processed")
}).await;
}

操场上联系

有没有办法做到这一点,而不转换为Vec,而不是保持futures::stream::Stream?

这听起来像是FuturesUnordered的一个很好的用例

你创建了许多future(即通过在Vec上运行map和collect),然后将它们转换成一个迭代器,该迭代器从哪个future先完成的异步生成结果。如果任何期货返回错误结果,可以跳过或适当处理。

相关内容

最新更新