如何使用kafka-python发送自定义有效载荷?(在同一WIFI网络的两台ubuntu机器之间)



如何使用kafka-python发送自定义有效载荷?

我有两台ubuntu机器,并且都在同一个WIFI网络下(一个地址是172.20.10.2,另一个是172.20.10.7),我可以使用deepstream test4 python脚本通过kafka使用上述ip成功传输检测到的边界框信息。但我想要定制的有效载荷…因此,我尝试了一些kafka-python脚本。生产商:

from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['172.20.10.7:9092'])
for e in range(100):
data = {'number' : e}
producer.send('demo01', value=data)
sleep(1)

对消费者:

from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'demo01',
bootstrap_servers=['172.20.10.7:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
print(message.value)

不工作…所以需要一些建议或可执行代码,如果可能的话!

如果你想要json.loads,你需要将生产者数据序列化为JSON

producer.send('demo01', value=json.dumps(data).encode('utf8'))

最新更新