如何使用 FlinkKafkaConsumer 单独解析密钥 <K> <T>而不是



据我所知,使用Flink的AVRO反序列化,您可以创建AVRO对象流,这很好,但似乎存在一个问题,Flink的kafka使用者只创建单个对象流:FlinkKafkaConsumerBase<T>,而不是带有KafkaConsumer的默认Kafkaneneneba API。

在我的情况下,Key和Value都是独立的AVRO模式兼容对象,合并它们的模式可能是一场噩梦。。。

此外,使用Flink API似乎无法检索ConsumerRecorded信息?。。。

基于Flink Kafka Consumer,有一个构造函数:

public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}

第二个参数KeyedDeserializationSchema用于取消对Kafka记录的序列化。它包括消息密钥、消息值、偏移量、主题等。因此,您可以将自己的类型MyKafkaRecord实现为T,其中包含Avro密钥和Avro值。然后将MyKafkaRecord作为T传递给KeyedDeserializationSchema的实现。以TypeInformationKeyValueSerializationSchema为例。

例如,从Kafka读取额外信息:

class KafkaRecord<K, V> {
private K key;
private V value;
private long offset;
private int partition;
private String topic;
...
}
class MySchema<K, V> implements KeyedDeserializationSchema<KafkaRecord<K, V>> {
KafkaRecord<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) {
KafkaRecord<K, V> rec = new KafkaRecord<>();
rec.key = KEY_DESERIaLISER.deserialize(messageKey);
rec.value = ...;
rec.topic = topic;
...
}
}

相关内容

  • 没有找到相关文章

最新更新