我正在编写脚本,通过反复取消拾取对象直到EOF来处理(非常大的)文件。我想对文件进行分区,并让单独的进程(在云中)取消拾取和处理单独的部分。
然而,我的partitioner并不智能,它不知道文件中已腌制对象之间的边界(因为这些边界取决于要腌制的对象类型等)
有没有一种方法可以扫描文件中的"启动pickle对象"哨兵?天真的方法是尝试在连续的字节偏移处取消pickle,直到对象被成功pickle为止,但这会产生意外的错误。似乎对于某些输入组合,unpickler会失去同步,并且对文件的其余部分不返回任何内容(请参阅下面的代码)。
import cPickle
import os
def stream_unpickle(file_obj):
while True:
start_pos = file_obj.tell()
try:
yield cPickle.load(file_obj)
except (EOFError, KeyboardInterrupt):
break
except (cPickle.UnpicklingError, ValueError, KeyError, TypeError, ImportError):
file_obj.seek(start_pos+1, os.SEEK_SET)
if __name__ == '__main__':
import random
from StringIO import StringIO
# create some data
sio = StringIO()
[cPickle.dump(random.random(), sio, cPickle.HIGHEST_PROTOCOL) for _ in xrange(1000)]
sio.flush()
# read from subsequent offsets and find discontinuous jumps in object count
size = sio.tell()
last_count = None
for step in xrange(size):
sio.seek(step, os.SEEK_SET)
count = sum(1 for _ in stream_unpickle(file_obj))
if last_count is None or count == last_count - 1:
last_count = count
elif count != last_count:
# if successful, these should never print (but they do...)
print '%d elements read from byte %d' % (count, step)
print '(%d elements read from byte %d)' % (last_count, step-1)
last_count = count
pickletools模块有一个显示操作码的dis函数。它显示有一个可以扫描的STOP操作码:
>>> import pickle, pickletools, StringIO
>>> s = StringIO.StringIO()
>>> pickle.dump('abc', s)
>>> p = s.getvalue()
>>> pickletools.dis(p)
0: S STRING 'abc'
7: p PUT 0
10: . STOP
highest protocol among opcodes = 0
注意,使用STOP操作码有点棘手,因为代码的长度是可变的,但它可能是关于截止点在哪里的有用提示。
如果你控制了另一端的酸洗步骤,那么你可以通过添加自己明确的替代分离器来改善这种情况:
>>> sep = 'xDExADxBExEF'
>>> s = StringIO.StringIO()
>>> pickle.dump('abc', s)
>>> s.write(sep)
>>> pickle.dump([10, 20], s)
>>> s.write(sep)
>>> pickle.dump('def', s)
>>> s.write(sep)
>>> pickle.dump([30, 40], s)
>>> p = s.getvalue()
拆包前,使用已知的分离器将其分成单独的泡菜
>>> for pick in p.split(sep):
print pickle.loads(pick)
abc
[10, 20]
def
[30, 40]
在pickle文件中,一些操作码有一个参数——操作码后面的数据值。数据值的长度各不相同,可以包含与操作码相同的字节。因此,如果您从任意位置开始读取文件,则无法知道您是在查看操作码还是在参数中间。您必须从头开始读取文件并解析操作码。
我编写了这个函数,从文件中跳过一个pickle,即读取它并解析操作码,但不构造对象。在我的一些文件中,它似乎比cPickle.loads
稍快。为了提高速度,你可以用C重写它。(正确测试后)
然后,您可以对整个文件进行一次遍历,以获得每个pickle的查找位置。
from pickletools import code2op, UP_TO_NEWLINE, TAKEN_FROM_ARGUMENT1, TAKEN_FROM_ARGUMENT4
from marshal import loads as mloads
def skip_pickle(f):
"""Skip one pickle from file.
'f' is a file-like object containing the pickle.
"""
while True:
code = f.read(1)
if not code:
raise EOFError
opcode = code2op[code]
if opcode.arg is not None:
n = opcode.arg.n
if n > 0:
f.read(n)
elif n == UP_TO_NEWLINE:
f.readline()
elif n == TAKEN_FROM_ARGUMENT1:
n = ord(f.read(1))
f.read(n)
elif n == TAKEN_FROM_ARGUMENT4:
n = mloads('i' + f.read(4))
f.read(n)
if code == '.':
break
很抱歉回答我自己的问题,感谢@RaymondHettinger提出添加哨兵的想法。
以下是对我有效的方法。我创建了读取器和写入器,它们在每个"记录"的开头使用一个sentinel '#S'
,后跟一个数据块长度。写入者必须注意在正在写入的数据中找到'#'
的任何出现,并将它们加倍(到'##'
中)。然后,读取器使用look-behind regex查找不同于原始流中任何匹配值的sentinel,并验证此sentinel和后续sentinel之间的字节数。
RecordWriter是一个上下文管理器(因此,如果需要,可以将对write()的多个调用封装到一条记录中)。RecordReader是一个生成器。
不确定这对性能有何影响。欢迎任何更快/更优雅的er解决方案。
import re
import cPickle
from functools import partial
from cStringIO import StringIO
SENTINEL = '#S'
# when scanning look for #S, but NOT ##S
sentinel_pattern = '(?<!#)#S' # uses negative look-behind
sentinel_re = re.compile(sentinel_pattern)
find_sentinel = sentinel_re.search
# when writing replace single # with double ##
write_pattern = '#'
write_re = re.compile(write_pattern)
fix_write = partial(write_re.sub, '##')
# when reading, replace double ## with single #
read_pattern = '##'
read_re = re.compile(read_pattern)
fix_read = partial(read_re.sub, '#')
class RecordWriter(object):
def __init__(self, stream):
self._stream = stream
self._write_buffer = None
def __enter__(self):
self._write_buffer = StringIO()
return self
def __exit__(self, et, ex, tb):
if self._write_buffer.tell():
self._stream.write(SENTINEL) # start
cPickle.dump(self._write_buffer.tell(), self._stream, cPickle.HIGHEST_PROTOCOL) # byte length of user's original data
self._stream.write(fix_write(self._write_buffer.getvalue()))
self._write_buffer = None
return False
def write(self, data):
if not self._write_buffer:
raise ValueError("Must use StreamWriter as a context manager")
self._write_buffer.write(data)
class BadBlock(Exception): pass
def verify_length(block):
fobj = StringIO(block)
try:
stated_length = cPickle.load(fobj)
except (ValueError, IndexError, cPickle.UnpicklingError):
raise BadBlock
data = fobj.read()
if len(data) != stated_length:
raise BadBlock
return data
def RecordReader(stream):
' Read one record '
accum = StringIO()
seen_sentinel = False
data = ''
while True:
m = find_sentinel(data)
if not m:
if seen_sentinel:
accum.write(data)
data = stream.read(80)
if not data:
if accum.tell():
try: yield verify_length(fix_read(accum.getvalue()))
except BadBlock: pass
return
else:
if seen_sentinel:
accum.write(data[:m.start()])
try: yield verify_length(fix_read(accum.getvalue()))
except BadBlock: pass
accum = StringIO()
else:
seen_sentinel = True
data = data[m.end():] # toss
if __name__ == '__main__':
import random
stream = StringIO()
data = [str(random.random()) for _ in xrange(3)]
# test with a string containing sentinel and almost-sentinel
data.append('abc12#jeoht38#SoSooihetS#')
count = len(data)
for i in data:
with RecordWriter(stream) as r:
r.write(i)
size = stream.tell()
start_pos = random.random() * size
stream.seek(start_pos, os.SEEK_SET)
read_data = [s for s in RecordReader(stream)]
print 'Original data: ', data
print 'After seeking to %d, RecordReader returned: %s' % (start_pos, read_data)