如何在aiokafka
中为json消息添加架构?Kafka Connect没有它就无法工作。
import asyncio
import json
import random
import aiokafka
from faker import Faker
def serializer(value):
return json.dumps(value).encode()
async def produce():
fake = Faker(['ru_RU'])
producer = aiokafka.AIOKafkaProducer(bootstrap_servers='localhost:9092', value_serializer=serializer)
await producer.start()
try:
while True:
message = {
'name': fake.first_name(),
'surname': fake.last_name(),
'age': random.randint(20, 30)
}
await producer.send("mytopic", message)
finally:
await producer.stop()
asyncio.run(produce())
Kafka Connect没有就无法工作
如果您有value.converter.schemas.enable=false
,它可以。是否需要架构取决于特定的连接器,而不是Connect本身。
如果您无法将序列化程序更改为confluent_kafka
库中的Avro序列化程序,则需要在您自己的上提供schema
和payload
作为每条消息的一部分
例如
message = { 'schema': { ... },
'payload': {
'name': fake.first_name(),
'surname': fake.last_name(),
'age': random.randint(20, 30)
}
}
更多信息:Kafka Connect Deep Dive-转换器和串行化解释