KafkaAvroSerializer,用于在没有schema.registry.url的情况下序列化Avro



我是卡夫卡和阿夫罗的无名小卒。所以我一直在努力让生产者/消费者运行。到目前为止,我已经能够使用以下内容生成和使用简单的字节和字符串:生产者配置:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);
Thread.sleep(250);
}
producer.close();
}

现在一切都很好,当我试图序列化POJO时,问题就来了。因此,我能够使用Avro提供的实用程序从POJO获得AvroSchema。对模式进行硬编码,然后尝试创建一个Generic Record以通过KafkaProducer发送生产商现在设置为:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.KafkaAvroSerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); // this is the Generated AvroSchema
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

这就是问题所在:当我使用KafkaAvroSerializer时,制作人没有出现,原因是:缺少必需参数:schema.registry.url

我读到了为什么需要这样做,这样我的消费者就可以破译生产者发给我的任何信息。但是这个模式不是已经嵌入到AvroMessage中了吗?如果有人能与KafkaAvroSerializer共享一个使用KafkaProducer的工作示例,而不必指定schema.registry.url ,那将是非常棒的

也非常感谢任何关于模式注册表实用性的见解/资源。

谢谢!

首先注意:在vanilla apache kafka中没有提供KafkaAvroSerializer,它是由Confluent Platform提供的。(https://www.confluent.io/),作为其开源组件的一部分(http://docs.confluent.io/current/platform.html#confluent-模式注册表)

快速回答:不,如果使用KafkaAvroSerializer,则需要一个模式注册表。请在此处查看一些示例:http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html

模式注册表的基本思想是,每个主题都将引用一个avro模式(即,您只能发送相互一致的数据。但一个模式可以有多个版本,因此您仍然需要为每个记录识别模式)

我们不想像你暗示的那样为每个数据编写模式——通常,模式比你的数据大!每次读取时解析它都会浪费时间,也会浪费资源(网络、磁盘、cpu)

相反,模式注册表实例将执行绑定avro schema <-> int schemaId,然后序列化程序将在从注册表获取数据(并缓存数据以备将来使用)之后,仅在数据之前写入此id。

因此,在kafka中,您的记录将是[<id> <bytesavro>](出于技术原因是魔术字节),这是一个只有5个字节的开销(与您的模式大小相比)当阅读时,你的消费者会找到与id对应的模式,并对其进行反序列化avro字节

如果您真的想为每个记录编写模式,那么您将需要另一个序列化程序(我认为是编写自己的序列化程序,但它很容易,只需重用https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java并移除模式注册表部分以将其替换为模式,读取时相同)。但如果你使用avro,我真的不鼓励这样做——总有一天,你需要实现类似avro注册表的东西来管理版本控制

虽然检查的答案都是正确的,但还应该提到的是,可以禁用模式注册

只需将auto.register.schemas设置为false

您可以创建自定义Avro serializer,然后即使没有Schema注册表,您也可以生成主题的记录。查看下面的文章。

https://codenotfound.com/spring-kafka-apache-avro-serializer-deserializer-example.html

在这里,他们使用了Kafkatemplate。我试过使用

KafkaProducer<String, User> UserKafkaProducer

它运行良好但是,如果您想使用KafkaAvroSerializer,您需要给Schema registryURL

正如其他人所指出的,KafkaAvroSerializer需要作为Confluent平台一部分的Schema Registry,并且使用需要许可。

使用模式注册表的主要优点是,与为每条消息编写带有模式的二进制有效负载相比,连线上的字节会更小。

我写了一篇博客文章详细介绍的优势

您可以始终使您的值类手动实现Serialiser<T>Deserialiser<T>(以及用于Kafka Streams的Serde<T>)。Java类通常是从Avro文件生成的,所以直接编辑它不是一个好主意,但包装可能是一种冗长但可行的方式。

另一种方法是调整用于Java类生成的Arvo生成器模板,并自动生成所有这些接口的实现。Avro maven和gradle插件都支持自定义模板,因此配置起来应该很容易。

我已经创建https://github.com/artemyarulin/avro-kafka-deserializable已经更改了模板文件和可用于文件生成的简单CLI工具

最新更新