来自contuent-kafka-python repo中的AvroProducer
示例,看来键/值架构是从文件加载的。也就是说,从此代码中:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema = avro.load('ValueSchema.avsc')
key_schema = avro.load('KeySchema.avsc')
value = {"name": "Value"}
key = {"name": "Key"}
avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value=value, key=key)
看来文件ValueSchema.avsc
和KeySchema.avsc
独立于AVRO架构注册表。
这是对的吗?引用AVRO模式注册表的URL的意义是什么,但是从磁盘上加载架构的键/值?
请澄清。
我遇到了同一问题,最初不清楚本地文件的意义是什么。如其他答案所述,第一个写入AVRO主题或对主题模式的更新,您需要架构字符串 - 您可以在此处从Kafka Rest文档中看到此内容。
在注册表中使用了架构后,您可以在这种情况下读取它(在这种情况下,我使用了请求Python模块(,然后使用Avro.loads((方法获取它。我发现这很有用,因为prody((函数要求您为AVROPORODUCER具有值模式,并且此代码将在不存在该本地文件的情况下使用:
get_schema_req_data = requests.get("http://1.2.3.4:8081/subjects/sample_value_schema/versions/latest")
get_schema_req_data.raise_for_status()
schema_string = get_schema_req_data.json()['schema']
value_schema = avro.loads(schema_string)
avroProducer = AvroProducer({'bootstrap.servers': '1.2.3.4:9092', 'schema.registry.url': 'http://1.2.3.4:8081'}, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value={"data" : "that matches your schema" })
希望这会有所帮助。
首先是在模式注册表中创建键和值架构的一种方法。您可以首先使用SR REST API在SR中创建它,也可以通过发布新消息来创建SR中现有模式的新模式或新版本。这完全是您选择哪种方法的选择。
查看代码,并考虑消费者而不是生产者需要注册表中的架构。MessageSerializer寄存器架构架构中的模式注册表为您:)