好吧,假设我完全按照预期工作:
from enum import IntEnum
from contstruct import *
class Char(IntEnum):
START = 0xAB
STOP = 0xBC
ESC = 0xCD
MAPPING = Mapping(Byte, {x: x+1 for x in Char})
SLIP = GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte),
Renamed(MAPPING, 'x')
),
Byte
)
)
示例:
>>> buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])
>>> SLIP.build(buffer)
b'x00xcdxacxcdxbdxcdxcexff’
和:
>>> from operator import eq
>>> all(map(eq, SLIP.parse(SLIP.build(buffer)), buffer))
True
现在我需要将编码/解码封装在另一个结构中:
PROTOCOL = FocusedSeq(
'message',
Const(Char.START, Byte),
Renamed(SLIP, 'message'),
Const(Char.STOP, Byte)
)
构建完全符合预期:
>>> PROTOCOL.build(buffer)
b'xabx00xcdxacxcdxbdxcdxcexffxbc'
但是,解析时,GreedyRange
消耗的字节太多:
>>> PROTOCOL.parse(b'xabx00xcdxacxcdxbdxcdxcexffxbc')
construct.core.StreamError: stream read less than specified amount, expected 1, found 0
如何让GreedyRange
返回一个字节?
在您的情况下,您可以简单地重新排列PROTOCOL
的字段,并将SLIP
放在末尾。
PROTOCOL = FocusedSeq(
'message',
Const(Char.START, Byte),
Const(Char.STOP, Byte),
Renamed(SLIP, 'message')
)
这样GreedyRange
将不会消耗导致流解析错误的所有字节:construct.core.StreamError: stream read less than specified amount, expected 1, found 0
。
这是一个修改后的示例:
from construct import Byte, Const, FocusedSeq, GreedyRange, Mapping, Renamed, Select
from enum import IntEnum
class Char(IntEnum):
START = 0xAB
STOP = 0xBC
ESC = 0xCD
MAPPING = Mapping(Byte, {x: x+1 for x in Char})
SLIP = GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte),
Renamed(MAPPING, 'x')
),
Byte
)
)
buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])
slip_build = SLIP.build(buffer)
assert slip_build == b'x00xcdxacxcdxbdxcdxcexff'
slip_parsed = SLIP.parse(b'x00xcdxacxcdxbdxcdxcexff')
PROTOCOL = FocusedSeq(
'message',
Const(Char.START, Byte),
Const(Char.STOP, Byte),
Renamed(SLIP, 'message')
)
protocol_build = PROTOCOL.build(buffer)
assert protocol_build == b'xabxbcx00xcdxacxcdxbdxcdxcexff'
protocol_parsed = PROTOCOL.parse(protocol_build)
assert protocol_parsed == slip_parsed
解决方案是NullTerminated(..., term=STOP)
,它在内部缓冲底层流,并在必要时返回。
PROTOCOL = FocusedSeq(
'message'
Const(Char.START, Byte),
message=NullTerminated(
# NOTE build consumes entire stream and appends STOP
# NOTE parse consumes steam until STOP and passes buffer to GreedyRange
GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte),
x=MAPPING # NOTE intentionally raises MappingError
),
Byte # NOTE fallback for MappingError
)
),
term=Byte.build(Char.STOP)
)
)
还有一种方法是使用构造适配器类来修改字节序列。
这是另一个代码示例:
from construct import Byte, Const, FocusedSeq, GreedyRange,
If, Mapping, Renamed, Select, this, Adapter
from enum import IntEnum
class Char(IntEnum):
START = 0xAB
STOP = 0xBC
ESC = 0xCD
MAPPING = Mapping(Byte, {x: x+1 for x in Char})
SLIP = GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte),
Renamed(MAPPING, 'x')
),
Byte
)
)
buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])
slip_build = SLIP.build(buffer)
assert slip_build == b'x00xcdxacxcdxbdxcdxcexff'
slip_parsed = SLIP.parse(b'x00xcdxacxcdxbdxcdxcexff')
class ProtocolAdapter(Adapter):
def _decode(self, obj, context, path):
# remove first and last bite
obj.pop(0)
obj.pop(-1)
return obj
def _encode(self, obj, context, path):
return obj
PROTOCOL = FocusedSeq(
"message",
If(this._building == True, Const(Char.START, Byte)),
"message" / SLIP,
If(this._building == True, Const(Char.STOP, Byte))
)
ADAPTED_PROTOCOL = ProtocolAdapter(PROTOCOL)
protocol_build = ADAPTED_PROTOCOL.build(buffer)
assert protocol_build == b'xabx00xcdxacxcdxbdxcdxcexffxbc'
protocol_parsed = ADAPTED_PROTOCOL.parse(protocol_build)
assert protocol_parsed == slip_parsed