据我所知,使用Flink的AVRO反序列化,您可以创建AVRO对象流,这很好,但似乎存在一个问题,Flink的kafka使用者只创建单个对象流:FlinkKafkaConsumerBase<T>
,而不是带有KafkaConsumer的默认Kafkaneneneba API。
在我的情况下,Key和Value都是独立的AVRO模式兼容对象,合并它们的模式可能是一场噩梦。。。
此外,使用Flink API似乎无法检索ConsumerRecorded信息?。。。
基于Flink Kafka Consumer,有一个构造函数:
public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}
第二个参数KeyedDeserializationSchema
用于取消对Kafka记录的序列化。它包括消息密钥、消息值、偏移量、主题等。因此,您可以将自己的类型MyKafkaRecord
实现为T,其中包含Avro密钥和Avro值。然后将MyKafkaRecord
作为T
传递给KeyedDeserializationSchema
的实现。以TypeInformationKeyValueSerializationSchema
为例。
例如,从Kafka读取额外信息:
class KafkaRecord<K, V> {
private K key;
private V value;
private long offset;
private int partition;
private String topic;
...
}
class MySchema<K, V> implements KeyedDeserializationSchema<KafkaRecord<K, V>> {
KafkaRecord<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) {
KafkaRecord<K, V> rec = new KafkaRecord<>();
rec.key = KEY_DESERIaLISER.deserialize(messageKey);
rec.value = ...;
rec.topic = topic;
...
}
}