我需要在下载过程中解压和解压缩大的.tar.gz文件(例如~5Gb(,而不在磁盘上保存存档文件。我使用reqwest板条箱下载文件,使用flate2板条箱解压,使用tar板条箱开箱。我尝试使用tar.gz格式。但也有zip和tar.bz2格式可供选择。(哪一个更容易使用?(我似乎设法实现了这一点,但出乎意料的是,开箱时出现了一个错误:
thread 'main' panicked at 'Cannot unpack archive: Custom { kind: UnexpectedEof, error: TarError { desc: "failed to unpack `/home/ruut/Projects/GreatWar/launcher/gamedata/gamedata-master/.vscode/settings.json`", io: Custom { kind: UnexpectedEof, error: TarError { desc: "failed to unpack `gamedata-master/.vscode/settings.json` into `/home/ruut/Projects/GreatWar/launcher/gamedata/gamedata-master/.vscode/settings.json`", io: Kind(UnexpectedEof) } } } }', /home/ruut/Projects/GreatWar/launcher/src/gitlab.rs:87:38
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
我的代码:
let full_url = format!("{}/{}/{}", HOST, repo_info.url, repo_info.download_url);
let mut response;
match self.client.get(&full_url).send().await {
Ok(res) => response = res,
Err(error) => {
return Err(Error::new(ErrorKind::InvalidData, error));
}
};
if response.status() == reqwest::StatusCode::OK {
let mut stream = response.bytes_stream();
while let Some(item) = stream.next().await {
let chunk = item
.or(Err(format!("Error while downloading file")))
.unwrap();
let b: &[u8] = &chunk.to_vec();
let gz = GzDecoder::new(b);
let mut archive = Archive::new(gz);
archive.unpack("./gamedata").expect("Cannot unpack archive");
}
}
第一次获取区块后出现archive.unpack
抛出错误
我做错了什么?
kmdreko的注释解释了代码失败的原因-.next()
只返回第一个块,并且必须将所有块馈送到gzip读取器。另一个答案显示了如何使用阻塞reqwest
API来实现它。
如果您想继续使用非阻塞的API,那么您可以在一个单独的线程中启动解码器,并通过通道为其提供数据。例如,您可以使用同时支持同步和异步接口的水槽通道。您还需要将通道转换为Read
,正如GzDecoder
所期望的那样。例如(编译,但未测试(:
use std::io::{self, Read};
use flate2::read::GzDecoder;
use futures_lite::StreamExt;
use tar::Archive;
async fn download() -> io::Result<()> {
let client = reqwest::Client::new();
let full_url = "...";
let response;
match client.get(full_url).send().await {
Ok(res) => response = res,
Err(error) => {
return Err(io::Error::new(io::ErrorKind::InvalidData, error));
}
};
let (tx, rx) = flume::bounded(0);
let decoder_thread = std::thread::spawn(move || {
let input = ChannelRead::new(rx);
let gz = GzDecoder::new(input);
let mut archive = Archive::new(gz);
archive.unpack("./gamedata").unwrap();
});
if response.status() == reqwest::StatusCode::OK {
let mut stream = response.bytes_stream();
while let Some(item) = stream.next().await {
let chunk = item
.or(Err(format!("Error while downloading file")))
.unwrap();
tx.send_async(chunk.to_vec()).await.unwrap();
}
drop(tx); // close the channel to signal EOF
}
tokio::task::spawn_blocking(|| decoder_thread.join())
.await
.unwrap()
.unwrap();
Ok(())
}
// Wrap a channel into something that impls `io::Read`
struct ChannelRead {
rx: flume::Receiver<Vec<u8>>,
current: io::Cursor<Vec<u8>>,
}
impl ChannelRead {
fn new(rx: flume::Receiver<Vec<u8>>) -> ChannelRead {
ChannelRead {
rx,
current: io::Cursor::new(vec![]),
}
}
}
impl Read for ChannelRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.current.position() == self.current.get_ref().len() as u64 {
// We've exhausted the previous chunk, get a new one.
if let Ok(vec) = self.rx.recv() {
self.current = io::Cursor::new(vec);
}
// If recv() "fails", it means the sender closed its part of
// the channel, which means EOF. Propagate EOF by allowing
// a read from the exhausted cursor.
}
self.current.read(buf)
}
}