如何从async函数返回Response.text()和Response



我正在尝试构建一个执行异步http请求并返回其Response+主体的程序。

这就是返回响应的函数的样子:

let responses = stream::iter(urls)
.map(|line| {
let client = &client;
async move {
client.get(&line).send().await.map(|resp| {   
(line, resp)
})}
})
.buffer_unordered(concurrency_amount);

但是,在返回resp之后,我不能使用resp.text(),因为resp.text()的类型是Future<Output=Result<String>>

如何使函数也返回元组中的resp.text()

假设您使用的是reqwest,应该可以使用Response::chunk()收集响应体的字节,text()消耗self,但chunk()只接受可变引用。

以下内容收集响应主体,并以有损的方式将其解码为字符串。

use futures_util::StreamExt;
#[tokio::main]
async fn main() {
let cli = reqwest::Client::new();
let urls = vec![
"https://stackoverflow.com".to_string(),
"https://google.com".into(),
"https://tokio.rs".into(),
];
let responses = futures_util::stream::iter(urls.into_iter())
.then(|url| { // note that I replaced `map` with `then` here.
let cli = cli.clone();
async move {
let mut resp = cli.get(url.clone()).send().await.unwrap();
let mut body = Vec::new();
while let Some(chunk) = resp.chunk().await.unwrap() {
body.extend_from_slice(&*chunk);
}
(url, resp, String::from_utf8_lossy(&body).to_string())
}
})
.collect::<Vec<_>>()
.await;
for (url, response, text) in responses {
println!("url: {} status: {} text: {}", url, response.status(), text);
}
}

正如内联注释所指出的:我将map()调用更改为then(),因此流产生元组,而不是以元组作为输出的期货。

这行得通吗?

let responses = stream::iter(urls)
.map(|line| {
let client = &client;
(async move || {
let resp = client.get(&line).send().await?;
let text = resp.text().await?;
(line, resp, text)
})()
})
.buffer_unordered(concurrency_amount);

最新更新