如何在python中调节从文件流中获得的数据量



我有一个嵌入式系统,我正在编写一个用户应用程序。用户应用程序需要获取固件映像并将其拆分为适合发送到嵌入式系统进行编程的块。我从S-record文件开始,并使用Xmodem进行文件传输(意味着每个主要的"文件"传输都需要以EOF结束),所以对我来说最简单的事情是将图像文件分割成一组完整的S-record文件,这些文件的大小不大于(单线程)嵌入式系统的接收缓冲区的大小。我的用户应用程序是用python编写的,我有一个C程序将固件映像拆分为适当大小的文件,但我认为可能有一种更"python"的方式来实现这一点,也许是通过使用自定义流处理程序。

任何想法吗?

编辑:为了加入讨论,我可以将我的输入文件放入缓冲区。我如何使用范围来设置进入文件大小或完整S-记录行('S'分隔的ASCII文本)的缓冲区的硬限制?

我认为这是一个有趣的问题,S-record格式不是太复杂,所以我写了一个S-record编码器,似乎从我有限的测试工作。

import struct
def s_record_encode(fileobj, recordtype, address, buflen):
    """S-Record encode bytes from file.
    fileobj      file-like object to read data (if any)
    recordtype   'S0' to 'S9'
    address      integer address
    buflen       maximum output buffer size
    """
    # S-type to (address_len, has_data)
    record_address_bytes = {
        'S0':(2, True), 'S1':(2, True), 'S2':(3, True), 'S3':(4, True),
        'S5':(2, False), 'S7':(4, False), 'S8':(3, False), 'S9':(2, False)
    }
    # params for this record type
    address_len, has_data = record_address_bytes[recordtype]
    # big-endian address as string, trimmed to length
    address = struct.pack('>L', address)[-address_len:]
    # read data up to 255 bytes minus address and checksum len
    if has_data:
        data = fileobj.read(0xff - len(address) - 1)
        if not data:
            return '', 0
    else:
        data = ''
    # byte count is address + data + checksum
    count = len(address) + len(data) + 1
    count = struct.pack('B', count)
    # checksum count + address + data
    checksummed_record = count + address + data
    checksum = struct.pack('B', sum(ord(d) for d in checksummed_record) & 0xff ^ 0xff)
    # glue record type to hex encoded buffer
    record = recordtype + (checksummed_record + checksum).encode('hex').upper()
    # return buffer and how much data we read from the file
    return record, len(data)

def s_record_test():
    from cStringIO import StringIO
    # from an example, this should encode to given string
    fake_file = StringIO("x0Ax0Ax0Dx00x00x00x00x00x00x00x00x00x00x00x00x00")
    encode_to = "S1137AF00A0A0D0000000000000000000000000061"
    fake_file.seek(0)
    record, buflen = s_record_encode(fake_file, 'S1', 0x7af0, 80)
    print 'record', record
    print 'encode_to', encode_to
    assert record == encode_to
    fake_file = StringIO()
    for i in xrange(1000):
        fake_file.write(struct.pack('>L', i))
    fake_file.seek(0)
    address = 0
    while True:
        buf, datalen = s_record_encode(fake_file, 'S2', address, 100)
        if not buf:
            break
        print address, datalen, buf
        address += datalen

如果你已经有了一个C程序,那么你很幸运。Python就像基于C的脚本语言,具有大多数相同的函数。请参阅核心工具,了解使用流处理所有熟悉的C I/O函数。然后,您可以通过将方法滚动到类中并使用Python切片表示法之类的东西来使程序更加Python化。

最新更新