我第一次尝试一个简单的kafka生产者,它将从json文件记录中获取数据。但是我得到错误。
我的Json文件(test.json):{
"states":
[
{
"name": "Alabama",
"abbreviation": "AL"
},
{
"name": "Alaska",
"abbreviation": "AK"
}
]
}
我的制作人类:
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
print('Producer created..............')
with open('/home/ravi/test.json') as f:
data = json.load(f)
for state in data['states']:
producer.send('ECJson', json.dump(state))
但是我得到错误:
Producer created..............
Traceback (most recent call last):
File "prodECJson.py", line 10, in <module>
producer.send('ECJson', json.dump(state))
TypeError: dump() missing 1 required positional argument: 'fp'
json.dump
将写入磁盘。你想要的是json.dumps
,它将从给定的对象创建一个字符串。KafkaProducer的send函数需要字节,所以你也必须对字符串进行编码。你也可以直接在KafkaProducer对象中指定序列化器
例如:
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda m: json.dumps(m).encode("utf-8"))
print('Producer created..............')
with open('/home/ravi/test.json') as f:
data = json.load(f)
for state in data['states']:
producer.send('ECJson', state)