kafka-python中的Json生成器



我第一次尝试一个简单的kafka生产者,它将从json文件记录中获取数据。但是我得到错误。

我的Json文件(test.json):
{
"states": 
[
{
"name": "Alabama",
"abbreviation": "AL"
},
{
"name": "Alaska",
"abbreviation": "AK"
}
]
}

我的制作人类:

import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
print('Producer created..............')
with open('/home/ravi/test.json') as f:
data = json.load(f)
for state in data['states']:
producer.send('ECJson', json.dump(state))

但是我得到错误:

Producer created..............
Traceback (most recent call last):
File "prodECJson.py", line 10, in <module>
producer.send('ECJson', json.dump(state))
TypeError: dump() missing 1 required positional argument: 'fp'

json.dump将写入磁盘。你想要的是json.dumps,它将从给定的对象创建一个字符串。KafkaProducer的send函数需要字节,所以你也必须对字符串进行编码。你也可以直接在KafkaProducer对象中指定序列化器

例如:

import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092', 
value_serializer=lambda m: json.dumps(m).encode("utf-8"))
print('Producer created..............')
with open('/home/ravi/test.json') as f:
data = json.load(f)
for state in data['states']:
producer.send('ECJson', state)

相关内容

  • 没有找到相关文章

最新更新