我正在尝试编写一个Flink流应用程序,该应用程序有一个KafkaSource从一个主题中读取,该主题为其数据定义了AVRO模式。
我想知道本地模式的自动缓存在这种情况下是如何工作的,类似于Confluent的文档。
基本上,用例是消费者不应该事先知道模式。一旦消费者被实例化,模式注册中心URL应该作为一个参数,消费者应该读取特定主题的模式。
这可能吗?任何提示都很感激!
一旦消费者被实例化,模式注册中心URL应该作为一个参数,消费者应该读取特定主题的模式。
它会被缓存。这就是所谓的"作家模式"。
消费者不应该事先知道模式
它需要,因为Avro需要一个"阅读器模式"。反序列化由"写入器模式"定义的数据。
如果没有阅读器模式,就只能处理AvroGenericRecord
类型
用于Glue的AWS SerDe库使用一种有线格式,其中包含序列化消息的模式(版本)的uid。消费应用程序从消息中读取模式id,并从Glue模式注册表加载它(如果它不在本地缓存中)。你可以在这个javascript serde库的自述文件底部找到对wire格式的描述:https://github.com/meinestadt/glue-schema-registry。
这应该是可能的。
你可以用Kafka的CLI工具,比如kcat
,像这样测试:
kcat -b mybroker -t ledger -s avro -r http://schema-registry-url:8080
如果您正在使用kafka-avro-console-consumer
:
kafka-avro-console-consumer --topic topicX --bootstrap-server kafka:9092 --property schema.registry.url="http://schema-registry:8081"