使用 ConfluentSchemaRegistry 反序列化 avro 数据时的异常



我是flink和Kafka的新手。我正在尝试使用融合模式注册表反序列化 avro 数据。我已经在 ec2 机器上安装了 flink 和 Kafka。此外,"测试"主题是在运行代码之前创建的。

代码路径:https://gist.github.com/mandar2174/5dc13350b296abf127b92d0697c320f2

该代码在实现过程中执行以下操作:

1) Create a flink DataStream object using a list of user element. (User class is avro generated class)
2) Write the Datastream source to Kafka using AvroSerializationSchema.
3) Read the data from Kafka using ConfluentRegistryAvroDeserializationSchema by reading the schema from Confluent Schema registry.

运行 flink 可执行 jar 的命令:

./bin/flink run -c com.streaming.example.ConfluentSchemaRegistryExample /opt/flink-1.7.2/kafka-flink-stream-processing-assembly-0.1.jar

运行代码时出现异常:

java.io.IOException: Unknown data format. Magic number does not match
    at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:55)
    at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:66)
    at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)

我用于用户类的 Avro 架构如下:

{
  "type": "record",
  "name": "User",
  "namespace": "com.streaming.example",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ]
    },
    {
      "name": "favorite_color",
      "type": [
        "string",
        "null"
      ]
    }
  ]
}

有人可以指出我缺少哪些步骤作为使用融合的 Kafka 模式注册表反序列化 avro 数据的一部分?

编写 Avro 数据的方式也需要使用注册表,以便依赖于它的反序列化程序正常工作。

但这是 Flink 中的一个开放 PR,仍然用于添加ConfluentRegistryAvroSerializationSchema

我相信解决方法是使用 AvroDeserializationSchema ,它不依赖于注册表。

如果你确实想在生产者代码中使用注册表,那么你必须在 Flink 之外这样做,直到该 PR 被合并。

相关内容

  • 没有找到相关文章

最新更新