我试图按时间戳列update_database_time
拆分Spark数据帧,并将其写入具有定义的Avro模式的HDFS。但是,在调用重新分区方法后,我得到以下异常:
Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type StructType(StructField(random_pk,DecimalType(38,0),true), StructField(random_string,StringType,true), StructField(code,StringType,true), StructField(random_bool,BooleanType,true), StructField(random_int,IntegerType,true), StructField(random_float,DoubleType,true), StructField(random_double,DoubleType,true), StructField(random_enum,StringType,true), StructField(random_date,DateType,true), StructField(random_decimal,DecimalType(4,2),true), StructField(update_database_time_tz,TimestampType,true), StructField(random_money,DecimalType(19,4),true)) to Avro type {"type":"record","name":"TestData","namespace":"DWH","fields":[{"name":"random_pk","type":["null",{"type":"bytes","logicalType":"decimal","precision":38,"scale":0}]},{"name":"random_string","type":["string","null"]},{"name":"code","type":["string","null"]},{"name":"random_bool","type":["boolean","null"]},{"name":"random_int","type":["int","null"]},{"name":"random_float","type":["double","null"]},{"name":"random_double","type":["double","null"]},{"name":"random_enum","type":["null",{"type":"enum","name":"enumType","symbols":["VAL_1","VAL_2","VAL_3"]}]},{"name":"random_date","type":["null",{"type":"int","logicalType":"date"}]},{"name":"random_decimal","type":["null",{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}]},{"name":"update_database_time","type":["null",{"type":"long","logicalType":"timestamp-millis"}]},{"name":"update_database_time_tz","type":["null",{"type":"long","logicalType":"timestamp-millis"}]},{"name":"random_money","type":["null",{"type":"bytes","logicalType":"decimal","precision":19,"scale":4}]}]}.
我假设用于分区的列在结果中消失。如何重新定义操作,使其不会发生?
这是我使用的代码:
dataDF.write
.partitionBy("update_database_time")
.format("avro")
.option(
"avroSchema",
SchemaRegistry.getSchema(
schemaRegistryConfig.url,
schemaRegistryConfig.dataSchemaSubject,
schemaRegistryConfig.dataSchemaVersion))
.save(s"${hdfsURL}${pathToSave}")
根据您提供的例外情况,该错误似乎源于获取的 AVRO 架构与 Spark 的架构之间的不兼容模式。快速浏览一下,最令人担忧的部分可能是这些:
- (可能催化剂不知道如何将字符串转换为枚举类型(
火花架构:
StructField(random_enum,StringType,true)
AVRO 架构:
{
"name": "random_enum",
"type": [
"null",
{
"type": "enum",
"name": "enumType",
"symbols": [
"VAL_1",
"VAL_2",
"VAL_3"
]
}
]
}
- (
update_databse_time_tz
在数据帧的架构中只出现一次,但在 AVRO 架构中出现两次(
火花架构:
StructField(update_database_time_tz,TimestampType,true)
AVRO 架构:
{
"name": "update_database_time",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
]
},
{
"name": "update_database_time_tz",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
]
}
我建议先合并架构并摆脱该异常,然后再进入其他可能的分区问题。
编辑:关于数字2,我错过了AVRO架构中有不同的名称,这导致了数据帧中缺少列update_database_time
的问题。