分区列在结果集数据帧 Spark 中消失



我试图按时间戳列update_database_time拆分Spark数据帧,并将其写入具有定义的Avro模式的HDFS。但是,在调用重新分区方法后,我得到以下异常:

Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type StructType(StructField(random_pk,DecimalType(38,0),true), StructField(random_string,StringType,true), StructField(code,StringType,true), StructField(random_bool,BooleanType,true), StructField(random_int,IntegerType,true), StructField(random_float,DoubleType,true), StructField(random_double,DoubleType,true), StructField(random_enum,StringType,true), StructField(random_date,DateType,true), StructField(random_decimal,DecimalType(4,2),true), StructField(update_database_time_tz,TimestampType,true), StructField(random_money,DecimalType(19,4),true)) to Avro type {"type":"record","name":"TestData","namespace":"DWH","fields":[{"name":"random_pk","type":["null",{"type":"bytes","logicalType":"decimal","precision":38,"scale":0}]},{"name":"random_string","type":["string","null"]},{"name":"code","type":["string","null"]},{"name":"random_bool","type":["boolean","null"]},{"name":"random_int","type":["int","null"]},{"name":"random_float","type":["double","null"]},{"name":"random_double","type":["double","null"]},{"name":"random_enum","type":["null",{"type":"enum","name":"enumType","symbols":["VAL_1","VAL_2","VAL_3"]}]},{"name":"random_date","type":["null",{"type":"int","logicalType":"date"}]},{"name":"random_decimal","type":["null",{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}]},{"name":"update_database_time","type":["null",{"type":"long","logicalType":"timestamp-millis"}]},{"name":"update_database_time_tz","type":["null",{"type":"long","logicalType":"timestamp-millis"}]},{"name":"random_money","type":["null",{"type":"bytes","logicalType":"decimal","precision":19,"scale":4}]}]}.

我假设用于分区的列在结果中消失。如何重新定义操作,使其不会发生?

这是我使用的代码:

    dataDF.write
      .partitionBy("update_database_time")
      .format("avro")
      .option(
        "avroSchema",
        SchemaRegistry.getSchema(
          schemaRegistryConfig.url,
          schemaRegistryConfig.dataSchemaSubject,
          schemaRegistryConfig.dataSchemaVersion))
  .save(s"${hdfsURL}${pathToSave}")

根据您提供的例外情况,该错误似乎源于获取的 AVRO 架构与 Spark 的架构之间的不兼容模式。快速浏览一下,最令人担忧的部分可能是这些:

  1. (可能催化剂不知道如何将字符串转换为枚举类型(

火花架构:

StructField(random_enum,StringType,true)

AVRO 架构:

{
      "name": "random_enum",
      "type": [
        "null",
        {
          "type": "enum",
          "name": "enumType",
          "symbols": [
            "VAL_1",
            "VAL_2",
            "VAL_3"
          ]
        }
      ]
    }
  1. (update_databse_time_tz在数据帧的架构中只出现一次,但在 AVRO 架构中出现两次(

火花架构:

StructField(update_database_time_tz,TimestampType,true)

AVRO 架构:

{
      "name": "update_database_time",
      "type": [
        "null",
        {
          "type": "long",
          "logicalType": "timestamp-millis"
        }
      ]
    },
    {
      "name": "update_database_time_tz",
      "type": [
        "null",
        {
          "type": "long",
          "logicalType": "timestamp-millis"
        }
      ]
    }

我建议先合并架构并摆脱该异常,然后再进入其他可能的分区问题。

编辑:关于数字2,我错过了AVRO架构中有不同的名称,这导致了数据帧中缺少列update_database_time的问题。

最新更新