如何获得构造.GreedyRange是否返回字节



好吧,假设我完全按照预期工作:

from enum import IntEnum
from contstruct import *
class Char(IntEnum):
START = 0xAB
STOP = 0xBC
ESC = 0xCD
MAPPING = Mapping(Byte, {x: x+1 for x in Char})
SLIP = GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte), 
Renamed(MAPPING, 'x')
),
Byte
)
)

示例:

>>> buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])
>>> SLIP.build(buffer)
b'x00xcdxacxcdxbdxcdxcexff’

和:

>>> from operator import eq
>>> all(map(eq, SLIP.parse(SLIP.build(buffer)), buffer))
True

现在我需要将编码/解码封装在另一个结构中:

PROTOCOL = FocusedSeq(
'message',
Const(Char.START, Byte),
Renamed(SLIP, 'message'),
Const(Char.STOP, Byte)
)

构建完全符合预期:

>>> PROTOCOL.build(buffer)
b'xabx00xcdxacxcdxbdxcdxcexffxbc'

但是,解析时,GreedyRange消耗的字节太多:

>>> PROTOCOL.parse(b'xabx00xcdxacxcdxbdxcdxcexffxbc')
construct.core.StreamError: stream read less than specified amount, expected 1, found 0

如何让GreedyRange返回一个字节?

在您的情况下,您可以简单地重新排列PROTOCOL的字段,并将SLIP放在末尾。

PROTOCOL = FocusedSeq(
'message',
Const(Char.START, Byte),
Const(Char.STOP, Byte),
Renamed(SLIP, 'message')
)

这样GreedyRange将不会消耗导致流解析错误的所有字节:construct.core.StreamError: stream read less than specified amount, expected 1, found 0

这是一个修改后的示例:

from construct import Byte, Const, FocusedSeq, GreedyRange, Mapping, Renamed, Select
from enum import IntEnum

class Char(IntEnum):
START = 0xAB
STOP = 0xBC
ESC = 0xCD
MAPPING = Mapping(Byte, {x: x+1 for x in Char})
SLIP = GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte),
Renamed(MAPPING, 'x')
),
Byte
)
)
buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])
slip_build = SLIP.build(buffer)
assert slip_build == b'x00xcdxacxcdxbdxcdxcexff'
slip_parsed = SLIP.parse(b'x00xcdxacxcdxbdxcdxcexff')
PROTOCOL = FocusedSeq(
'message',
Const(Char.START, Byte),
Const(Char.STOP, Byte),
Renamed(SLIP, 'message')
)
protocol_build = PROTOCOL.build(buffer)
assert protocol_build == b'xabxbcx00xcdxacxcdxbdxcdxcexff'
protocol_parsed = PROTOCOL.parse(protocol_build)
assert protocol_parsed == slip_parsed

解决方案是NullTerminated(..., term=STOP),它在内部缓冲底层流,并在必要时返回。

PROTOCOL = FocusedSeq(
'message'
Const(Char.START, Byte),
message=NullTerminated(
# NOTE build consumes entire stream and appends STOP
# NOTE parse consumes steam until STOP and passes buffer to GreedyRange
GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte),
x=MAPPING  # NOTE intentionally raises MappingError
),
Byte  # NOTE fallback for MappingError
)
),
term=Byte.build(Char.STOP)
)
)

还有一种方法是使用构造适配器类来修改字节序列。

这是另一个代码示例:

from construct import Byte, Const, FocusedSeq, GreedyRange, 
If, Mapping, Renamed, Select, this, Adapter
from enum import IntEnum

class Char(IntEnum):
START = 0xAB
STOP = 0xBC
ESC = 0xCD
MAPPING = Mapping(Byte, {x: x+1 for x in Char})
SLIP = GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte),
Renamed(MAPPING, 'x')
),
Byte
)
)
buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])
slip_build = SLIP.build(buffer)
assert slip_build == b'x00xcdxacxcdxbdxcdxcexff'
slip_parsed = SLIP.parse(b'x00xcdxacxcdxbdxcdxcexff')

class ProtocolAdapter(Adapter):
def _decode(self, obj, context, path):
# remove first and last bite
obj.pop(0)
obj.pop(-1)
return obj
def _encode(self, obj, context, path):
return obj
PROTOCOL = FocusedSeq(
"message",
If(this._building == True, Const(Char.START, Byte)),
"message" / SLIP,
If(this._building == True, Const(Char.STOP, Byte))
)
ADAPTED_PROTOCOL = ProtocolAdapter(PROTOCOL)
protocol_build = ADAPTED_PROTOCOL.build(buffer)
assert protocol_build == b'xabx00xcdxacxcdxbdxcdxcexffxbc'
protocol_parsed = ADAPTED_PROTOCOL.parse(protocol_build)
assert protocol_parsed == slip_parsed

最新更新