FlinkKafkaConsumer / KafkaSource with AWS Glue Schema Regist



我正在尝试编写一个Flink流应用程序,该应用程序有一个KafkaSource从一个主题中读取,该主题为其数据定义了AVRO模式。

我想知道本地模式的自动缓存在这种情况下是如何工作的,类似于Confluent的文档。

基本上,用例是消费者不应该事先知道模式。一旦消费者被实例化,模式注册中心URL应该作为一个参数,消费者应该读取特定主题的模式。

这可能吗?任何提示都很感激!

一旦消费者被实例化,模式注册中心URL应该作为一个参数,消费者应该读取特定主题的模式。

它会被缓存。这就是所谓的"作家模式"。

消费者不应该事先知道模式

它需要,因为Avro需要一个"阅读器模式"。反序列化由"写入器模式"定义的数据。

如果没有阅读器模式,就只能处理AvroGenericRecord类型

用于Glue的AWS SerDe库使用一种有线格式,其中包含序列化消息的模式(版本)的uid。消费应用程序从消息中读取模式id,并从Glue模式注册表加载它(如果它不在本地缓存中)。你可以在这个javascript serde库的自述文件底部找到对wire格式的描述:https://github.com/meinestadt/glue-schema-registry。

这应该是可能的。

你可以用Kafka的CLI工具,比如kcat,像这样测试:

kcat -b mybroker -t ledger -s avro -r http://schema-registry-url:8080

如果您正在使用kafka-avro-console-consumer:

kafka-avro-console-consumer --topic topicX --bootstrap-server kafka:9092 --property schema.registry.url="http://schema-registry:8081"

最新更新