Apache Flink read Avro byte[] from Kafka



在回顾示例时,我看到了很多这样的内容:

FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);

我看到他们在这里已经知道架构。

直到我将 byte[] 读入通用记录,我才知道架构 然后获取架构。(因为它可能会因记录而异)

有人可以指出我进入一个从byte[]读取到地图过滤器的FlinkKafkaConsumer08,以便我可以删除一些前导位,然后将该byte[]加载到通用记录中?

如果您使用Confluent的模式注册表,我认为首选的解决方案是使用Confluent提供的Avro serde。这样,我们只需调用deserialize(),要使用的最新版本 Avro 架构的分辨率将在后台自动完成,无需字节操作。

它归结为这样的东西(scala中的示例代码,java解决方案将非常相似):

import io.confluent.kafka.serializers.KafkaAvroDeserializer
...
val valueDeserializer = new KafkaAvroDeserializer()
valueDeserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava, 
false)
...
override def deserialize(messageKey: Array[Byte], message: Array[Byte], 
topic: String, partition: Int, offset: Long): KafkaKV = {
val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord]
val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord]
KafkaKV(key, value)
}
...

此方法要求消息创建器也与架构注册表集成,并在其中发布架构。这可以通过与上述非常类似的方式完成,使用 Confluent 的KafkaAvroSerializer

我在这里发布了一个详细的解释:如何将 Flink 与 Confluent 的模式注册表集成。

我正在做类似的事情(我正在使用 09 消费者)

在自定义反序列化程序的主代码传递中:

FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
parameterTool.getRequired("topic"), new MyDeserializationSchema<>(),
parameterTool.getProperties());

自定义反序列化架构读取字节,找出架构和/或从架构注册表中检索它,反序列化为 GenericRecord 并返回 GenericRecord 对象。

public class MyDeserializationSchema<T> implements DeserializationSchema<T> {

private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class;
@Override
public T deserialize(byte[] arg0) throws IOException {
//do your stuff here, strip off your bytes
//deserialize and create your GenericRecord 
return (T) (myavroevent);
}
@Override
public boolean isEndOfStream(T nextElement) {
return false;
}
@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avrotype);
}
}

最新更新