AvroTypeException: Not an enum: MOBILE on DataFileWriter



当我尝试使用 Flink 1.3.2 和 avro 1.8.2 中的内置AvroKeyValueSinkWriter写入 avro 记录时,我收到以下错误消息:

我的架构如下所示:

{"namespace": "com.base.avro",
 "type": "record",
 "name": "Customer",
 "doc": "v6",
 "fields": [
     {"name": "CustomerID", "type": "string"},
     {"name": "platformAgent", "type": {
       "type": "enum",
       "name": "PlatformAgent",
       "symbols": ["WEB", "MOBILE", "UNKNOWN"]
       }, "default":"UNKNOWN"}
 ]
}

我调用以下 Flink 代码来写入数据:

    var properties = new util.HashMap[String, String]()
    val stringSchema = Schema.create(Type.STRING)
    val myTypeSchema = Customer.getClassSchema
    val keySchema = stringSchema.toString
    val valueSchema = myTypeSchema.toString
    val compress = true
    properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema)
    properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema)
    properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, compress.toString)
    properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC)
    val sink = new BucketingSink[org.apache.flink.api.java.tuple.Tuple2[String, Customer]]("s3://test/flink")
    sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/mm/"))
    sink.setInactiveBucketThreshold(120000) // this is 2 minutes
    sink.setBatchSize(1024 * 1024 * 64) // this is 64 MB,
    sink.setPendingSuffix(".avro")
    val writer = new AvroKeyValueSinkWriter[String, Customer](properties)
    sink.setWriter(writer.duplicate())

但是,它会引发以下错误:

Caused by: org.apache.avro.AvroTypeException: Not an enum: MOBILE
    at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:177)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:119)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:302)
    ... 10 more

请指教!

更新 1:根据这张票,我发现这是 avro 1.8+ 中的一种错误:https://issues-test.apache.org/jira/browse/AVRO-1810

事实证明这是 Avro 1.8+ 的问题,我必须覆盖 flink 使用的版本 dependencyOverrides += "org.apache.avro" % "avro" % "1.7.3" ,该错误可以在这里找到 https://issues-test.apache.org/jira/browse/AVRO-1810

最新更新