使用hyper/azure sdk返回流的生命周期问题



我一直在尝试返回一个流,就像我用tokio::fs::File所做的那样,但是我在BlobClient上得到一个终生错误。

error[E0597]: `blob` does not live long enough
--> srcmain.rs:20:27
|
20 |     let stream = Box::pin(blob.get().stream(128));
|                           ^^^^^^^^^^
|                           |
|                           borrowed value does not live long enough
|                           argument requires that `blob` is borrowed for `'static`
...
24 | }
| - `blob` dropped here while still borrowed

我已经尝试了一堆不同的方法来处理流,但我无法绕过这个生命周期错误。我相信可能是我一直忽视的简单问题。谢谢你的帮助。

这是我正在尝试做的一个repo:

use std::{convert::Infallible, net::SocketAddr};
use azure_core::new_http_client;
use azure_storage::{
blob::prelude::{AsBlobClient, AsContainerClient},
clients::{AsStorageClient, StorageAccountClient},
};
use futures::TryStreamExt;
use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
};
async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
let http_client = new_http_client();
let storage_account_client = StorageAccountClient::new_access_key(http_client.clone(), "account", "key");
let storage_client = storage_account_client.as_storage_client();
let blob = storage_client.as_container_client("container").as_blob_client("blob");
let stream = Box::pin(blob.get().stream(128));
let s = stream.and_then(|f| futures::future::ok(f.data));
Ok(Response::new(Body::wrap_stream(s)))
}
#[tokio::main]
async fn main() {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let make_service = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
let server = Server::bind(&addr).serve(make_service);
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}

问题是流借用了blob,但wrap_stream()函数只接受'static流。一种解决方法是在新任务中构造流,并通过通道发送回流项。下面的辅助函数可以帮助实现这种方法:

/// Creates a `'static` stream from a closure returning a (possibly) non-`'static` stream.
///
/// The stream items, closure, and closure argument are still restricted to being `'static`,
/// but the closure can return a non-`'static` stream that borrows from the closure
/// argument.
fn make_static_stream<T, F, U>(
make_stream: F,
mut make_stream_arg: U,
) -> impl Stream<Item = T>
where
T: Send + 'static,
F: FnOnce(&mut U) -> BoxStream<'_, T> + Send + 'static,
U: Send + 'static,
{
let (mut tx, rx) = futures::channel::mpsc::channel(0);
tokio::spawn(async move {
let stream = make_stream(&mut make_stream_arg);
pin_mut!(stream);
while let Some(item) = stream.next().await {
if tx.feed(item).await.is_err() {
// Receiver dropped
break;
}
}
tx.close().await.ok();
});
rx
}

在原始代码中是这样使用的:

// ...
let stream = make_static_stream(
|blob| blob.get().stream(128).map_ok(|x| x.data).boxed(),
blob,
);
Ok(Response::new(Body::wrap_stream(stream)))
}

相关内容

  • 没有找到相关文章

最新更新