我正在尝试构建一个执行异步http请求并返回其Response+主体的程序。
这就是返回响应的函数的样子:
let responses = stream::iter(urls)
.map(|line| {
let client = &client;
async move {
client.get(&line).send().await.map(|resp| {
(line, resp)
})}
})
.buffer_unordered(concurrency_amount);
但是,在返回resp
之后,我不能使用resp.text()
,因为resp.text()
的类型是Future<Output=Result<String>>
。
如何使函数也返回元组中的resp.text()
?
假设您使用的是reqwest
,应该可以使用Response::chunk()
收集响应体的字节,text()
消耗self
,但chunk()
只接受可变引用。
以下内容收集响应主体,并以有损的方式将其解码为字符串。
use futures_util::StreamExt;
#[tokio::main]
async fn main() {
let cli = reqwest::Client::new();
let urls = vec![
"https://stackoverflow.com".to_string(),
"https://google.com".into(),
"https://tokio.rs".into(),
];
let responses = futures_util::stream::iter(urls.into_iter())
.then(|url| { // note that I replaced `map` with `then` here.
let cli = cli.clone();
async move {
let mut resp = cli.get(url.clone()).send().await.unwrap();
let mut body = Vec::new();
while let Some(chunk) = resp.chunk().await.unwrap() {
body.extend_from_slice(&*chunk);
}
(url, resp, String::from_utf8_lossy(&body).to_string())
}
})
.collect::<Vec<_>>()
.await;
for (url, response, text) in responses {
println!("url: {} status: {} text: {}", url, response.status(), text);
}
}
正如内联注释所指出的:我将map()
调用更改为then()
,因此流产生元组,而不是以元组作为输出的期货。
这行得通吗?
let responses = stream::iter(urls)
.map(|line| {
let client = &client;
(async move || {
let resp = client.get(&line).send().await?;
let text = resp.text().await?;
(line, resp, text)
})()
})
.buffer_unordered(concurrency_amount);