我得到了一个压缩文件,其中包含多个单独的压缩XML流。压缩后的文件为833mb。
如果我尝试将其作为单个对象解压缩,我只得到第一个流(大约19 kb)。
我修改了下面的代码,作为一个老问题的答案,以解压缩每个流并将其写入文件:
import zlib
outfile = open('output.xml', 'w')
def zipstreams(filename):
"""Return all zip streams and their positions in file."""
with open(filename, 'rb') as fh:
data = fh.read()
i = 0
print "got it"
while i < len(data):
try:
zo = zlib.decompressobj()
dat =zo.decompress(data[i:])
outfile.write(dat)
zo.flush()
i += len(data[i:]) - len(zo.unused_data)
except zlib.error:
i += 1
outfile.close()
zipstreams('payload')
infile.close()
该代码运行并产生所需的结果(所有XML数据解压缩到单个文件)。问题是,它需要几天的工作!
即使在压缩文件中有成千上万的流,它仍然看起来应该是一个更快的过程。大约8天来解压缩833mb(估计3gb的原始文件),这表明我做了一些非常错误的事情。
是否有另一种更有效的方法,或者速度慢是我所遇到的读-解压缩-写-重复瓶颈的结果?
谢谢你的任何指示或建议!
如果不更具体地了解您实际处理的文件格式,很难说得太多,但很明显,您的算法对子字符串的处理是二次型的——当您有数万个子字符串时,这不是一件好事。让我们看看我们知道什么:
你说供应商说他们是
使用标准的zlib压缩库。这些是构建gzip实用程序的相同压缩例程。
从这里我们可以得出结论,组件流是原始zlib格式,并且不是封装在gzip包装器中(或PKZIP归档,或其他)。ZLIB格式的权威文档在这里:https://www.rfc-editor.org/rfc/rfc1950
那么让我们假设您的文件完全如您所描述的那样:一个32字节的头文件,后面跟着连接在一起的原始ZLIB流,中间没有任何其他内容。 (编辑:情况并非如此,毕竟)。
Python的zlib文档提供了一个Decompress
类,它实际上非常适合遍历文件。它包括一个属性unused_data
,它的文档明确地声明:
所以,这就是你可以做的:写一个循环,读取确定压缩数据字符串在哪里结束的唯一方法是实际解压缩它。这意味着当压缩数据包含较大文件的一部分时,您只能通过读取数据并将其与一些非空字符串一起提供给解压缩对象的decompress()方法来找到它的结尾,直到unused_data属性不再是空字符串为止。
data
,比如说,一次一个块(甚至不需要把整个800MB的文件读到内存中)。将每个块推到Decompress
对象,并检查unused_data
属性。当它变成非空时,你就得到了一个完整的对象。将其写入磁盘,创建一个新的解压缩对象,并使用最后一个解压缩对象的unused_data
初始化它。这可能会起作用(未经测试,请检查正确性)。
Edit:由于您的数据流中确实有其他数据,因此我添加了一个与下一个ZLIB开始对齐的例程。您需要找到并填充在数据中标识ZLIB流的两字节序列。(请随意使用您的旧代码来发现它。)虽然一般来说没有固定的ZLIB头,但每个流的头应该是相同的,因为它由协议选项和标志组成,它们在整个运行过程中应该是相同的。
import zlib
# FILL IN: ZHEAD is two bytes with the actual ZLIB settings in the input
ZHEAD = CMF+FLG
def findstart(header, buf, source):
"""Find `header` in str `buf`, reading more from `source` if necessary"""
while buf.find(header) == -1:
more = source.read(2**12)
if len(more) == 0: # EOF without finding the header
return ''
buf += more
offset = buf.find(header)
return buf[offset:]
然后可以进入下一个流的开始。我已经添加了try
/except
对,因为相同的字节序列可能出现在流之外:
source = open(datafile, 'rb')
skip_ = source.read(32) # Skip non-zlib header
buf = ''
while True:
decomp = zlib.decompressobj()
# Find the start of the next stream
buf = findstart(ZHEAD, buf, source)
try:
stream = decomp.decompress(buf)
except zlib.error:
print "Spurious match(?) at output offset %d." % outfile.tell(),
print "Skipping 2 bytes"
buf = buf[2:]
continue
# Read until zlib decides it's seen a complete file
while decomp.unused_data == '':
block = source.read(2**12)
if len(block) > 0:
stream += decomp.decompress(block)
else:
break # We've reached EOF
outfile.write(stream)
buf = decomp.unused_data # Save for the next stream
if len(block) == 0:
break # EOF
outfile.close()
PS 1。如果我是你,我会把每个XML流写进一个单独的文件。
PS 2。您可以在文件的第一个MB上测试您所做的任何操作,直到您获得足够的性能。
在现代处理器(例如2 GHz的i7)上解压缩833 MB应该需要大约30秒。所以,是的,你做错了什么。尝试在每个字节偏移处解压缩以查看是否出现错误是问题的一部分,尽管不是全部。有更好的方法来查找压缩数据。理想情况下,你应该找到或弄清楚格式。或者,您可以使用RFC 1950规范搜索有效的zlib头,尽管您可能会得到误报。
更重要的可能是,您一次将整个833 MB读取到内存中,并将3gb解压缩到内存中,可能每次都是大块的。你的机器有多少内存?
您可能正在切换到虚拟内存。如果您显示的代码有效,则数据没有被压缩。Zip是一种特定的文件格式,通常具有. Zip扩展名,它将原始压缩数据封装在本地和中央目录信息的结构中,用于重建文件系统中的目录。您必须有一些相当不同的东西,因为您的代码正在寻找并明显地找到zlib流。你有什么格式?你在哪里买的?它是如何记录的?您能否提供一个转储,比如前100个字节?
应该这样做的方法是而不是,将整个内容读入内存并立即解压缩整个流,也解压缩到内存中。相反,可以使用zlib.decompressobj
接口,它允许您一次提供一个片段,并获得结果可用的解压缩数据。您可以以更小的块读取输入文件,通过使用文档格式或查找zlib (RFC 1950头文件)找到解压缩的数据流,然后通过解压缩对象一次运行这些数据块,在需要的地方写出解压缩的数据。decomp.unused_data
可用于检测压缩流的结束(如您找到的示例中所示)。
从您在评论中描述的情况来看,听起来他们将单独发送给您的单个文件连接在一起。这意味着每一个都有一个32字节的头,你需要跳过。
如果你不跳过这些报头,它可能会有你描述的行为:如果你幸运的话,你会得到32个无效报头错误,然后成功解析下一个流。如果不走运,32字节的垃圾看起来就像真正流的开始,并且您将浪费大量时间解析任意数量的字节,直到最终得到解码错误。(如果你的真的不走运,它实际上会成功解码,给你一大块垃圾,并吃掉一个或多个后续流。)
所以,尝试在每个流结束后跳过32个字节。
或者,如果您有一种更可靠的方法来检测下一个流的开始(这就是为什么我告诉您打印出偏移量并查看十六进制编辑器中的数据,而alexis告诉您查看zlib规范),请这样做。