如何在winAPI中将Zlib与连接的.gz文件一起使用



我正在从AWS下载常见的爬网文件。显然,它们是大型连接的.gz文件,这是gzip标准所支持的。我使用zlib来放气,但我只得到文件的解压缩内容,直到第一次连接。我试着添加了inflateReset((,但后来出现了错误-5,这表明缓冲区或文件有问题。我怀疑我已经接近了。

这是没有充气的代码Reset。它在非连接文件上运行良好。

#include "zlib.h"  
#define CHUNK 16384   
...
file = L"CC-MAIN-20181209185547-20181209211547-00040.warc.wet.gz";
fileDecompress(&file);
DWORD WINAPI fileDecompress(LPVOID lpParameter)
{
wstring dir = L"C:\AI\corpora\";
wstring* lpFileName = static_cast<wstring*>(lpParameter);
sendToReportWindow(L"File to decompress is "%s" in "%s"n", lpFileName->c_str(), dir.c_str());
wstring sourcePath = dir + lpFileName->c_str();
sendToReportWindow(L"input file with path:%sn", sourcePath.c_str());
wstring destPath = dir + lpFileName->c_str() + L".wet";
sendToReportWindow(L"output file with path:%sn", destPath.c_str());
HANDLE InputFile = INVALID_HANDLE_VALUE;
HANDLE OutputFile = INVALID_HANDLE_VALUE;
BOOL Success;
DWORD InputFileSize;
ULONGLONG StartTime, EndTime;
LARGE_INTEGER FileSize;
//  Open input file for reading, existing file only.
InputFile = CreateFile(
sourcePath.c_str(),       //  Input file name, compressed file
GENERIC_READ,             //  Open for reading
FILE_SHARE_READ,          //  Share for read
NULL,                     //  Default security
OPEN_EXISTING,            //  Existing file only
FILE_ATTRIBUTE_NORMAL,    //  Normal file
NULL);                    //  No template
if (InputFile == INVALID_HANDLE_VALUE)
{
sendToReportWindow(L"Cannot open input t%sn", sourcePath.c_str());
return 0;
}
OutputFile = CreateFile(
destPath.c_str(),         //  Input file name, compressed file
GENERIC_WRITE,            //  Open for reading
0,                        //  Share for read
NULL,                     //  Default security
CREATE_ALWAYS,            //  Existing file only
FILE_ATTRIBUTE_NORMAL,    //  Normal file
NULL);                    //  No template
if (OutputFile == INVALID_HANDLE_VALUE)
{
sendToReportWindow(L"Cannot open output t%sn", destPath.c_str());
return 0;
}
//  Get compressed file size.
Success = GetFileSizeEx(InputFile, &FileSize);
if ((!Success) || (FileSize.QuadPart > 0xFFFFFFFF))
{
sendToReportWindow(L"Cannot get input file size or file is larger than 4GB.n");
CloseHandle(InputFile);
return 0;
}
InputFileSize = FileSize.LowPart;
sendToReportWindow(L"input file size: %u bytesn", InputFileSize);
int ret;
unsigned have;
z_stream strm;
unsigned char in[CHUNK];
unsigned char out[CHUNK];
strm.zalloc = Z_NULL;              // allocate inflate state
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
strm.avail_in = 0;
strm.next_in = Z_NULL;
ret = inflateInit2(&strm, 16 + MAX_WBITS);
if (ret != Z_OK)
{
return 0;
}
do {                                                                    /* decompress until deflate stream ends or end of file */  
DWORD read;
BOOL res = ReadFile(InputFile, in, CHUNK, &read, NULL);
strm.avail_in = read;
if (!res) {
(void)inflateEnd(&strm);
sendToReportWindow(L"read error on input filen");
return 0;
}
if (strm.avail_in == 0)
{
break;
}
strm.next_in = in;

/* run inflate() on input until output buffer not full */
do {
strm.avail_out = CHUNK;
strm.next_out = out;
ret = inflate(&strm, Z_NO_FLUSH);
assert(ret != Z_STREAM_ERROR);  /* state not clobbered */
switch (ret) {
case Z_NEED_DICT:                                           // 2
sendToReportWindow(L"z_need_dict:%dn", ret);
(void)inflateEnd(&strm);
return 0;
//ret = Z_DATA_ERROR;     /* and fall through */
case Z_DATA_ERROR:                                          // -3
sendToReportWindow(L"z_data_error:%dn", ret);
(void)inflateEnd(&strm);
return 0;
case Z_MEM_ERROR:                                           // -4
(void)inflateEnd(&strm);
sendToReportWindow(L"z_mem_error:%dn", ret);
sendToReportWindow(L"ret:%dn", ret);
DisplayErrorBox((LPWSTR)L"inflate");
return 0;
case Z_BUF_ERROR:                                           // -5
sendToReportWindow(L"z_buf_error:%dn", ret);
(void)inflateEnd(&strm);
return 0;
}
have = CHUNK - strm.avail_out;   
DWORD written;
BOOL res = WriteFile(OutputFile, out, have, &written, NULL);
if (written != have || !res) {
(void)inflateEnd(&strm);
sendToReportWindow(L"file write error:%dn", res);
return 0;
}

} while (strm.avail_out == 0);          //  avail_out == 0 means output buffer is full 
} while (ret != Z_STREAM_END);  /* done when inflate() says it's done */            // Z_STREAM_END is 1
(void)inflateEnd(&strm);
CloseHandle(InputFile); CloseHandle(OutputFile);
return 0;
}

这是添加了充气Reset((的版本。此版本导致膨胀生成错误-5(缓冲区损坏或文件被截断(。

...
int ret;
z_stream strm{};
array<uint8_t, CHUNK> scratch = {}; //scratch buffer for decompressing the data.
strm.zalloc = Z_NULL;              // allocate inflate state
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
strm.avail_in = 0;
strm.next_in = Z_NULL;
ret = inflateInit2(&strm, 16 + MAX_WBITS);
if (ret != Z_OK)
{
return 0;
}
do {                                                                    /* decompress until deflate stream ends or end of file */ 
DWORD read;
BOOL res = ReadFile(InputFile, in, CHUNK, &read, NULL);
strm.avail_in = read;
if (!res) {
(void)inflateEnd(&strm);
sendToReportWindow(L"read error on input filen");
return 0;
}
if (strm.avail_in == 0)
{
sendToReportWindow(L"strm.avail_in:%dn", strm.avail_in);       // strm.avail_in = 0
break;
}
strm.next_in = in;
/* run inflate() on input until output buffer not full */
do {
strm.avail_out = scratch.size();
strm.next_out = scratch.data();
ret = inflate(&strm, Z_NO_FLUSH);
//if (ret != Z_OK) break;                                     // 0

switch (ret) {
case Z_NEED_DICT:                                           // 2
sendToReportWindow(L"z_need_dict:%dn", ret);
(void)inflateEnd(&strm);
return 0;
//ret = Z_DATA_ERROR;     /* and fall through */
case Z_STREAM_ERROR:                                        // -2
sendToReportWindow(L"Z_STREAM_ERROR:%dn", ret);
(void)inflateEnd(&strm);
return 0;
case Z_DATA_ERROR:                                          // -3
sendToReportWindow(L"z_data_error:%dn", ret);
(void)inflateEnd(&strm);
return 0;
case Z_MEM_ERROR:                                           // -4
(void)inflateEnd(&strm);
sendToReportWindow(L"z_mem_error:%dn", ret);
sendToReportWindow(L"ret:%dn", ret);
DisplayErrorBox((LPWSTR)L"inflate");
return 0;
case Z_BUF_ERROR:                                           // -5
sendToReportWindow(L"z_buf_error:%dn", ret);
(void)inflateEnd(&strm);
//return 0;
break;
}
auto bytes_decoded = scratch.size() - strm.avail_out;

DWORD written;
BOOL res = WriteFile(OutputFile, &scratch, bytes_decoded, &written, NULL);
if (ret == Z_STREAM_END) break;
} while (true);          //  avail_out == 0 means output buffer is full
ret == Z_STREAM_END;
auto reset_result = inflateReset(&strm);        // work with concatenation
sendToReportWindow(L"resetting inflate: %dn", reset_result);
assert(reset_result == Z_OK);      
} while (strm.avail_in > 0);
...

谢谢!

更新:我认为readFile应该在CHUNK中读取,而不是1。对两个示例都进行了更改。这现在给了我错误-3:";"Z_DATA_ERROR";。检查此更改是否真的多次命中readfile。

我想放气的典型文件:[https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2018-51/segments/1544376823009.19/wet/CC-MAIN-20181209185547-20181209211547-00041.warc.wet.gz]

更新2:谢谢你Mark Adler!使用您提供的示例,我能够修复代码中的逻辑。这满足了winAPI的要求。我还添加了文件文本处理,将内容移动到堆中,并添加了计时器。计时器显示,更多的内存有助于减少30%的放气时间。

DWORD WINAPI fileDecompress(LPVOID lpParameter)
{                                                                                
// zlib does not work with .zip files
sendToReportWindow(L"inside fileDecompress()n");                            
// deflate .gz (gzip) files. single or multiple member (concatenated)
wstring dir = L"C:\AI\corpora\";
wstring* lpFileName = static_cast<wstring*>(lpParameter);
sendToReportWindow(L"File to decompress is "%s" in "%s"n", lpFileName->c_str(), dir.c_str());
wstring sourcePath = dir + lpFileName->c_str();
sendToReportWindow(L"input file with path:%sn", sourcePath.c_str());
wstring::size_type lastdot = lpFileName->find_last_of(L".");                 // remove .gz extension: get length to last dot and truncate
lpFileName->resize(lastdot);
wstring destPath = dir + lpFileName->c_str();
sendToReportWindow(L"output file with path:%sn", destPath.c_str());
HANDLE InputFile = INVALID_HANDLE_VALUE;
HANDLE OutputFile = INVALID_HANDLE_VALUE;
BOOL Success;
DWORD InputFileSize;
ULONGLONG StartTime, EndTime;
LARGE_INTEGER FileSize;
double InflateTime;
InputFile = CreateFile(
sourcePath.c_str(),       //  Input file name, compressed file
GENERIC_READ,             //  Open for reading
FILE_SHARE_READ,          //  Share for read
NULL,                     //  Default security
OPEN_EXISTING,            //  Existing file only
FILE_ATTRIBUTE_NORMAL,    //  Normal file
NULL);                    //  No template
if (InputFile == INVALID_HANDLE_VALUE){sendToReportWindow(L"Cannot open input t%sn", sourcePath.c_str()); return 0; }
OutputFile = CreateFile(
destPath.c_str(),         //  Input file name, compressed file
GENERIC_WRITE,            //  Open for reading
0,                        //  Share for read
NULL,                     //  Default security
CREATE_ALWAYS,            //  Existing file only
FILE_ATTRIBUTE_NORMAL,    //  Normal file
NULL);                    //  No template
if (OutputFile == INVALID_HANDLE_VALUE){sendToReportWindow(L"Cannot open output t%sn", destPath.c_str()); return 0; }
Success = GetFileSizeEx(InputFile, &FileSize);                              // Get compressed file size.
if ((!Success) || (FileSize.QuadPart > 0xFFFFFFFF))
{
sendToReportWindow(L"Cannot get input file size or file is larger than 4GB.n");
CloseHandle(InputFile);
return 0;
}
InputFileSize = FileSize.LowPart;
sendToReportWindow(L"input file size: %u bytesn", InputFileSize);
StartTime = GetTickCount64();
#define CHUNK 524288                                                        // buffer size. doesn't use much ram and speeds up inflate
z_stream strm = {};                                                         // Initialize zlib for file compression/decompression
int ret = inflateInit2(&strm, 16 + MAX_WBITS);
assert(ret == Z_OK);
unsigned char *in = new unsigned char[CHUNK]; unsigned char* out = new unsigned char[CHUNK];   
for (;;) {                                                                  // Decompress from input to output.
if (strm.avail_in == 0) {                                               // Keep reading until the end of the input file or an error
DWORD read;
(void)ReadFile(InputFile, in, CHUNK, &read, NULL);
strm.avail_in = read;
if (strm.avail_in == 0)
break;
strm.next_in = in;
}
do {                                                                    // Decompress all of what's in the CHUNK in buffer.
strm.avail_out = CHUNK;                                                     
strm.next_out = out;
ret = inflate(&strm, Z_NO_FLUSH);                                   // Decompress as much as possible to the CHUNK out buffer.
    
size_t got = CHUNK - strm.avail_out;                                
DWORD written;                                                      
(void)WriteFile(OutputFile, out, got, &written, NULL);              // Write to the outputFile whatever inflate() left in out buffer
if (written != got) {sendToReportWindow(L"file write errorn"); delete[] in; delete[] out; return 0;}
                                                
if (ret == Z_STREAM_END)                                            // Check for the end of a gzip member, in which case, 
assert(inflateReset(&strm) == Z_OK);                            // reset inflate for the next gzip member. (concatenated files)
else if (ret != Z_OK) {                                             // Return on a data error.
assert(ret == Z_DATA_ERROR);
(void)inflateEnd(&strm);
delete[] in; delete[] out;
return 0;
}   
} while (strm.avail_in > 0);                                            // Continue until everything in the input buffer is consumed.
}                                                                           // for() loop to get next input buffer CHUNK from input file    
EndTime = GetTickCount64();
InflateTime = (EndTime - StartTime) / 1000.0;                               //  Get how long it took to inflate file
delete[] in; delete[] out;
(void)inflateEnd(&strm);                                                       
CloseHandle(InputFile); CloseHandle(OutputFile);
sendToReportWindow(L"Inflate Time: %.2f seconds. Done with fileDecompress function.n", InflateTime);
return 0;
}

您的编译器是否至少没有警告您关于裸条件ret == Z_STREAM_END;?您需要一个if,并在与inflateReset()相关的语句周围使用一些大括号。

如果strm.avail_in为零,那么仍然存在一个问题,即您将离开外循环。这种情况每次都会发生,除非到达成员的末尾。如果你碰巧耗尽了输入缓冲区来解压缩该成员,这种情况甚至会发生。只需将外环设为while (true)即可。

即使在修复了所有这些之后,当您在外循环的顶部进行读取时,您也会丢弃剩余的可用输入。只有当strm.avail_in为零时才读取。

一个更简单的方法是在内部循环中进行重置。像这样(C中的例子(:

// Decompress a gzip file input, potentially with multiple gzip members. Write
// the decompressed data to output. Return Z_STREAM_END on success. Return Z_OK
// if the gzip stream was correct up to where it ended prematurely. Return
// Z_DATA error if the gzip stream is invalid.
int inflate_gzip(FILE *input, FILE *output) {
// Initialize inflate for gzip input.
z_stream strm = {};
int ret = inflateInit2(&strm, 16 + MAX_WBITS);
assert(ret == Z_OK);
// Decompress from input to output.
unsigned char in[CHUNK];
for (;;) {
// Keep reading until the end of the input file or an error.
if (strm.avail_in == 0) {
strm.avail_in = fread(in, 1, CHUNK, input);
if (strm.avail_in == 0)
break;
strm.next_in = in;
}
// Decompress all of what's in the input buffer.
do {
// Decompress as much as possible to the CHUNK output buffer.
unsigned char out[CHUNK];
strm.avail_out = CHUNK;
strm.next_out = out;
ret = inflate(&strm, Z_NO_FLUSH);
// Write to the output file whatever inflate() left in the output
// buffer. Return with an error if the write does not complete.
size_t got = CHUNK - strm.avail_out;
size_t put = fwrite(out, 1, got, output);
if (put != got)
return Z_ERRNO;
// Check for the end of a gzip member, in which case reset inflate
// for the next gzip member.
if (ret == Z_STREAM_END)
assert(inflateReset(&strm) == Z_OK);
// Return on a data error.
else if (ret != Z_OK) {
assert(ret == Z_DATA_ERROR);
(void)inflateEnd(&strm);
return ret;
}
// Continue until everything in the input buffer is consumed.
} while (strm.avail_in > 0);
}
// Successfully decompressed all of the input file. Clean up and return.
assert(inflateEnd(&strm) == Z_OK);
return ret;
}

最新更新