aiokafka中json消息的模式



如何在aiokafka中为json消息添加架构?Kafka Connect没有它就无法工作。

import asyncio
import json
import random
import aiokafka
from faker import Faker

def serializer(value):
return json.dumps(value).encode()

async def produce():
fake = Faker(['ru_RU'])
producer = aiokafka.AIOKafkaProducer(bootstrap_servers='localhost:9092', value_serializer=serializer)
await producer.start()
try:
while True:
message = {
'name': fake.first_name(),
'surname': fake.last_name(),
'age': random.randint(20, 30)
}
await producer.send("mytopic", message)
finally:
await producer.stop()

asyncio.run(produce())

Kafka Connect没有就无法工作

如果您有value.converter.schemas.enable=false,它可以。是否需要架构取决于特定的连接器,而不是Connect本身。


如果您无法将序列化程序更改为confluent_kafka库中的Avro序列化程序,则需要在您自己的上提供schemapayload作为每条消息的一部分

例如

message = { 'schema': { ... }, 
'payload': {
'name': fake.first_name(),
'surname': fake.last_name(),
'age': random.randint(20, 30)
}
}

更多信息:Kafka Connect Deep Dive-转换器和串行化解释

最新更新