我正在尝试为 kafka avro 序列化主题创建一个 flink 消费者。 我有卡夫卡主题流式传输 avro 序列化数据。我可以通过 avroconsoleconsumer 看到它。
Flink 1.6.0 增加了一个AvroDeserializationSchema
但我找不到它的完整用法示例。是的,有一些似乎在 1.6.0 添加该类之前生成了一个 avrodeserialization 类。
我有一个通过 avro 工具生成的 avro 类。
现在我一直在尝试遵循存在的例子,但它们足够不同,以至于我无法让事情继续下去。(我不经常用Java编程(
大多数使用以下形式的某种形式
Myclass mc = new MyClass();
AvroDeserializationSchema<Myclass> ads = new AvroDeserializationSchema<> (Myclass.class);
FlinkKafkaConsumer010<Myclass> kc = new FlinkKafkaConsumer010<>(topic,ads,properties);
其中Myclass是通过avro-tools jar生成的avro类。这是正确的方法吗?执行此操作并利用内部 flink 1.6.0 avrodeserializationschema 类时,我遇到了一些私有/公共访问问题。我是否必须创建一个新类并扩展 avrodeserializationschema?
好的,我深入研究了 kafka consumer javadocs,并找到了一个示例来提取 using avro 流。我仍然需要将 kafka 消费转换为 flinkKafkaConsumer,但下面的代码有效。
对于io.confluent的工作引用,我必须添加一个存储库和一个对pom文件的依赖关系。
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.1.1</version>
</dependency>
public class StreamingJob {
// static DeserializationSchema<pendingsv> avroSchema = new AvroDeserializationSchema<pendingsv>(pendingsv.class);
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "opssupport.alarms");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
String topic = "pendingSVs_";
final Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (ConsumerRecord<String, GenericRecord> record : records) {
System.out.printf("offset = %d, key = %s, value = %s n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
// execute program
//env.execute("Flink Streaming Java API Skeleton");
}
}