我有一个使用 org.apache.kafka.common.serialization.LongSerializer
作为密钥序列化程序的 Java Kafka 生产者,我正在尝试使用 Python Kafka 消费者来消费来自该主题的消息。
我认为由于LongSerializer
是org.apache.kafka
的一部分,因此其他语言的所有官方Kafka客户端中都可以使用等效的序列化程序和解序列化程序,以促进互操作性。但是,我找不到它。
那么,人们是否应该只对纯JVM的项目使用org.apache.kafka.common.serialization
,或者是否有其他方法可以使用Python反序列化这些对象?
我觉得我错过了一些东西,因为我发现很难相信 Kafka 提供了开箱即用的序列化程序和解序列化程序,它们不会促进用不同语言编写的进程之间的通信......
如果有人在被问到一年后仍然需要答案,你可以在Python中重新实现Java的LongSerializer:
def long_deserializer(data: bytes):
if not data:
return None
if len(data) != 8:
raise Exception(f"Size of data received by long_deserializer is not 8. Received {len(data)}")
# 0xF...FFFFFFFF is always -1
if data == b'xffxffxffxffxffxffxffxff':
return -1
value = 0
for b in data:
value <<= 8
value |= b & 0xFF
return value
示例用法如下:
- 输入:
b'x00x00x00x00x00x00x04xd2
- 解码部分:
x04xd2
- 解码为二进制:
0x04=100, d=1010 , 2=0010
- 结果:
10010100010
- 转换为 int(返回值(:
10010100010 = 1234
另外,如果你想要一种更"pythonic"的方式,你可以使用内置函数:
int('0x' + data.hex().lstrip('0'), 0)