正在取消拾取中流(python)



我正在编写脚本,通过反复取消拾取对象直到EOF来处理(非常大的)文件。我想对文件进行分区,并让单独的进程(在云中)取消拾取和处理单独的部分。

然而,我的partitioner并不智能,它不知道文件中已腌制对象之间的边界(因为这些边界取决于要腌制的对象类型等)

有没有一种方法可以扫描文件中的"启动pickle对象"哨兵?天真的方法是尝试在连续的字节偏移处取消pickle,直到对象被成功pickle为止,但这会产生意外的错误。似乎对于某些输入组合,unpickler会失去同步,并且对文件的其余部分不返回任何内容(请参阅下面的代码)。

import cPickle
import os
def stream_unpickle(file_obj):
    while True:
        start_pos = file_obj.tell()
        try:
            yield cPickle.load(file_obj)
        except (EOFError, KeyboardInterrupt):
            break
        except (cPickle.UnpicklingError, ValueError, KeyError, TypeError, ImportError):
            file_obj.seek(start_pos+1, os.SEEK_SET)
if __name__ == '__main__':
    import random
    from StringIO import StringIO
    # create some data
    sio = StringIO()
    [cPickle.dump(random.random(), sio, cPickle.HIGHEST_PROTOCOL) for _ in xrange(1000)]
    sio.flush()
    # read from subsequent offsets and find discontinuous jumps in object count
    size = sio.tell()
    last_count = None
    for step in xrange(size):
        sio.seek(step, os.SEEK_SET)
        count = sum(1 for _ in stream_unpickle(file_obj))
        if last_count is None or count == last_count - 1:
            last_count = count
        elif count != last_count:
            # if successful, these should never print (but they do...)
            print '%d elements read from byte %d' % (count, step)
            print '(%d elements read from byte %d)' % (last_count, step-1)
            last_count = count

pickletools模块有一个显示操作码的dis函数。它显示有一个可以扫描的STOP操作码:

>>> import pickle, pickletools, StringIO
>>> s = StringIO.StringIO()
>>> pickle.dump('abc', s)
>>> p = s.getvalue()
>>> pickletools.dis(p)
    0: S    STRING     'abc'
    7: p    PUT        0
   10: .    STOP
highest protocol among opcodes = 0

注意,使用STOP操作码有点棘手,因为代码的长度是可变的,但它可能是关于截止点在哪里的有用提示。

如果你控制了另一端的酸洗步骤,那么你可以通过添加自己明确的替代分离器来改善这种情况:

>>> sep = 'xDExADxBExEF'
>>> s = StringIO.StringIO()
>>> pickle.dump('abc', s)
>>> s.write(sep)
>>> pickle.dump([10, 20], s)
>>> s.write(sep)
>>> pickle.dump('def', s)
>>> s.write(sep)
>>> pickle.dump([30, 40], s)
>>> p = s.getvalue()

拆包前,使用已知的分离器将其分成单独的泡菜

>>> for pick in p.split(sep):
        print pickle.loads(pick)
abc
[10, 20]
def
[30, 40]

在pickle文件中,一些操作码有一个参数——操作码后面的数据值。数据值的长度各不相同,可以包含与操作码相同的字节。因此,如果您从任意位置开始读取文件,则无法知道您是在查看操作码还是在参数中间。您必须从头开始读取文件并解析操作码。

我编写了这个函数,从文件中跳过一个pickle,即读取它并解析操作码,但不构造对象。在我的一些文件中,它似乎比cPickle.loads稍快。为了提高速度,你可以用C重写它。(正确测试后)

然后,您可以对整个文件进行一次遍历,以获得每个pickle的查找位置。

from pickletools import code2op, UP_TO_NEWLINE, TAKEN_FROM_ARGUMENT1, TAKEN_FROM_ARGUMENT4   
from marshal import loads as mloads
def skip_pickle(f):
    """Skip one pickle from file.
    'f' is a file-like object containing the pickle.
    """
    while True:
        code = f.read(1)
        if not code:
            raise EOFError
        opcode = code2op[code]
        if opcode.arg is not None:
            n = opcode.arg.n
            if n > 0:
                f.read(n)
            elif n == UP_TO_NEWLINE:
                f.readline()
            elif n == TAKEN_FROM_ARGUMENT1:
                n = ord(f.read(1))
                f.read(n)
            elif n == TAKEN_FROM_ARGUMENT4:
                n = mloads('i' + f.read(4))
                f.read(n)
        if code == '.':
            break        

很抱歉回答我自己的问题,感谢@RaymondHettinger提出添加哨兵的想法。

以下是对我有效的方法。我创建了读取器和写入器,它们在每个"记录"的开头使用一个sentinel '#S',后跟一个数据块长度。写入者必须注意在正在写入的数据中找到'#'的任何出现,并将它们加倍(到'##'中)。然后,读取器使用look-behind regex查找不同于原始流中任何匹配值的sentinel,并验证此sentinel和后续sentinel之间的字节数。

RecordWriter是一个上下文管理器(因此,如果需要,可以将对write()的多个调用封装到一条记录中)。RecordReader是一个生成器。

不确定这对性能有何影响。欢迎任何更快/更优雅的er解决方案。

import re
import cPickle
from functools import partial
from cStringIO import StringIO
SENTINEL = '#S'
# when scanning look for #S, but NOT ##S
sentinel_pattern = '(?<!#)#S' # uses negative look-behind
sentinel_re = re.compile(sentinel_pattern)
find_sentinel = sentinel_re.search
# when writing replace single # with double ##
write_pattern = '#'
write_re = re.compile(write_pattern)
fix_write = partial(write_re.sub, '##')
# when reading, replace double ## with single #
read_pattern = '##'
read_re = re.compile(read_pattern)
fix_read = partial(read_re.sub, '#') 
class RecordWriter(object):
    def __init__(self, stream):
        self._stream = stream
        self._write_buffer = None
    def __enter__(self):
        self._write_buffer = StringIO()
        return self
    def __exit__(self, et, ex, tb):
        if self._write_buffer.tell():
            self._stream.write(SENTINEL) # start
            cPickle.dump(self._write_buffer.tell(), self._stream, cPickle.HIGHEST_PROTOCOL) # byte length of user's original data
            self._stream.write(fix_write(self._write_buffer.getvalue()))
            self._write_buffer = None
        return False
    def write(self, data):
        if not self._write_buffer:
            raise ValueError("Must use StreamWriter as a context manager")
        self._write_buffer.write(data)
class BadBlock(Exception): pass
def verify_length(block):
    fobj = StringIO(block)
    try:
        stated_length = cPickle.load(fobj)
    except (ValueError, IndexError, cPickle.UnpicklingError):
        raise BadBlock
    data = fobj.read()
    if len(data) != stated_length:
        raise BadBlock
    return data
def RecordReader(stream):
    ' Read one record '
    accum = StringIO()
    seen_sentinel = False
    data = ''
    while True:
        m = find_sentinel(data)
        if not m:
            if seen_sentinel:
                accum.write(data)
            data = stream.read(80)
            if not data:
                if accum.tell():
                    try: yield verify_length(fix_read(accum.getvalue()))
                    except BadBlock: pass
                return
        else:
            if seen_sentinel:
                accum.write(data[:m.start()])
                try: yield verify_length(fix_read(accum.getvalue()))
                except BadBlock: pass
                accum = StringIO()
            else:
                seen_sentinel = True
            data = data[m.end():] # toss
if __name__ == '__main__':
    import random
    stream = StringIO()
    data = [str(random.random()) for _ in xrange(3)]
    # test with a string containing sentinel and almost-sentinel
    data.append('abc12#jeoht38#SoSooihetS#')
    count = len(data)
    for i in data:
        with RecordWriter(stream) as r:
            r.write(i)
    size = stream.tell()
    start_pos = random.random() * size
    stream.seek(start_pos, os.SEEK_SET)
    read_data = [s for s in RecordReader(stream)]
    print 'Original data: ', data
    print 'After seeking to %d, RecordReader returned: %s' % (start_pos, read_data)

相关内容

  • 没有找到相关文章

最新更新