我需要一种快速的方式在python多处理进程之间通过zeromq
每秒发送300条短消息。每条消息需要包含 ID
和 time.time()
msgpack
似乎是在通过zeromq
发送之前序列化字典的最佳方式,并且方便地,msgpack
有一个我所需要的示例,除了它有一个datetime.datetime.now()
。
import datetime
import msgpack
useful_dict = {
"id": 1,
"created": datetime.datetime.now(),
}
def decode_datetime(obj):
if b'__datetime__' in obj:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
return obj
def encode_datetime(obj):
if isinstance(obj, datetime.datetime):
return {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f")}
return obj
packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)
问题是他们的例子不工作,我得到这个错误:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
KeyError: 'as_str'
也许是因为我在python 3.4上,但我不知道strptime有什么问题。非常感谢您的帮助。
考虑到messagepack (import msgpack
)擅长序列化整数,我创建了一个只使用整数的解决方案:
_datetime_ExtType = 42
def _unpacker_hook(code, data):
if code == _datetime_ExtType:
values = unpack(data)
if len(values) == 8: # we have timezone
return datetime.datetime(*values[:-1], dateutil.tz.tzoffset(None, values[-1]))
else:
return datetime.datetime(*values)
return msgpack.ExtType(code, data)
# This will only get called for unknown types
def _packer_unknown_handler(obj):
if isinstance(obj, datetime.datetime):
if obj.tzinfo:
components = (obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond, int(obj.utcoffset().total_seconds()))
else:
components = (obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond)
# we effectively double pack the values to "compress" them
data = msgpack.ExtType(_datetime_ExtType, pack(components))
return data
raise TypeError("Unknown type: {}".format(obj))
def pack(obj, **kwargs):
# we don't use a global packer because it wouldn't be re-entrant safe
return msgpack.packb(obj, use_bin_type=True, default=_packer_unknown_handler, **kwargs)
def unpack(payload):
try:
# we temporarily disable gc during unpack to bump up perf: https://pypi.python.org/pypi/msgpack-python
gc.disable()
# This must match the above _packer parameters above. NOTE: use_list is faster
return msgpack.unpackb(payload, use_list=False, encoding='utf-8', ext_hook=_unpacker_hook)
finally:
gc.enable()
Python3和Python2管理不同的字符串编码:encoding-and-decoding-strings-in-python-3-x
然后需要:
- 使用
b'as_str'
(而不是'as_str'
)作为字典键 - 使用
encode
和decode
作为存储值
像这样修改代码适用于python2和python3:
import datetime
import msgpack
useful_dict = {
"id": 1,
"created": datetime.datetime.now(),
}
def decode_datetime(obj):
if b'__datetime__' in obj:
obj = datetime.datetime.strptime(obj[b'as_str'].decode(), "%Y%m%dT%H:%M:%S.%f")
return obj
def encode_datetime(obj):
if isinstance(obj, datetime.datetime):
obj = {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f").encode()}
return obj
packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)
在文档中说packb
(和pack
)的unicode字符串的默认编码是utf-8。您可以简单地在decode_datetime
函数中搜索unicode字符串'__datetime__'
,而不是字节对象b'__datetime__'
,并将encoding='utf-8'
参数添加到unpack
。像这样:
def decode_datetime(obj):
if '__datetime__' in obj:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
return obj
packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime, encoding='utf-8')