如何处理Apache Spark中更改的镶木模式



我遇到了一个问题,我在S3中将parquet数据作为每日块(以s3://bucketName/prefix/YYYY/MM/DD/的形式),但我无法从不同日期中读取AWS EMR Spark中的数据类型不匹配,我得到了许多例外之一,例如:

java.lang.ClassCastException: optional binary element (UTF8) is not a group

出现在某些文件中时有一个具有值的数组类型,但同一列可能在其他文件中具有null值,然后将其推断为字符串类型。

org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 42.0 failed 4 times, most recent failure: Lost task 23.3 in stage 42.0 (TID 2189, ip-172-31-9-27.eu-west-1.compute.internal):
org.apache.spark.SparkException: Failed to merge incompatible data types ArrayType(StructType(StructField(Id,LongType,true), StructField(Name,StringType,true), StructField(Type,StringType,true)),true)

我以JSON格式具有S3的原始数据,我的最初计划是创建一个自动工作,该工作启动EMR群集,在上一个日期的JSON数据中读取,然后将其写入parquet返回S3。P>

JSON数据也分为日期,即密钥具有日期前缀。阅读JSON工作正常。无论目前正在读取多少数据,都可以从数据中推断出模式。

但是,当木板文件编写时,问题就会上升。据我了解,当我编写带有元数据文件的Parquet时,这些文件包含用于镶木quet文件的所有零件/分区的架构。对我来说,这似乎也可以使用不同的模式。当我禁用写作元数据时,据说Spark可以从给定的镶木素路径中的第一个文件中推断整个架构,并假定它通过其他文件保持不变。

当某些应为double类型的列仅具有一定一天的整数值时,从JSON中读取它们(这些数字作为整数,没有浮动点)使Spark认为这是具有long类型的列。即使我可以在编写Parquet文件之前将这些列施加一倍,但这仍然不好,因为模式可能会更改,可以添加新列,并且跟踪这是不可能的。

我看到有些人有同样的问题,但我还没有找到足够好的解决方案。

对此的最佳实践或解决方案是什么?

这些是我将parquet写入S3的选项;关闭架构合并增强作者的写入性能 - 它也可能解决您的问题

val PARQUET_OPTIONS = Map(
 "spark.sql.parquet.mergeSchema" -> "false",
 "spark.sql.parquet.filterPushdown" -> "true")

当我从JSON中读取数据并在每日S3文件夹中写入Parquet时,在读取JSON或将易于错误的列转换为正确类型之前,请在写入parquet之前写入parquet。Parquet,Spark可能会根据数据实例中的值推断出不同日期的数据,并使用冲突的模式编写Parquet文件。

这可能不是完美的解决方案,但是我发现使用不断发展的模式解决问题的唯一方法是:

在我每天(更具体地说是每晚)CRON的工作之前,请批处理处理前一天的数据,我正在创建一个大多数空值的虚拟对象。

我确保ID是可识别的,例如,由于真实数据具有唯一的ID-S,因此我将"虚拟"字符串添加为虚拟数据对象的ID。

然后,我将给出具有易于错误类型的属性的预期值,例如,我将给出浮点/双倍的非零值,因此,当与JSON编辑时,它们肯定会具有十进制分离器,例如" 0.2",而不是"而不是"0"(在编组JSON时,带有0个值的双打/浮子显示为" 0"不是" 0.0")。

字符串,布尔值和整数工作正常,但是除了双打/浮子外,我还需要将数组作为其他类/构造的空数字和对象的空数,以相应的空对象,以便它们不会为" null" -s,s,s,当Spark读取Null-S时,AS弦。


然后,如果我填补了所有必要字段,我将把对象汇总到JSON并将文件写入S3。

然后,我将在我的Scala批处理处理脚本中使用这些文件来读取它们,将模式保存到变量上,并在我在真实的JSON数据中阅读时将该架构作为参数,以避免使用自己的模式推断出自己的模式。

这样,我知道所有字段始终是相同类型的,而架构合并仅在添加新字段时加入模式。

当然,它添加了当添加新的容易出现错误类型的新字段时手动更新虚拟对象创建的缺点,但这是当前的小缺点,因为这是我发现的唯一解决方案。

只需制作一个rdd [string],其中每个字符串是json,当将rdd作为dataframe使用primitiveasString选项时,将所有datatypes用于字符串

 val binary_zip_RDD = sc.binaryFiles(batchHolder.get(i), minPartitions = 50000)
 // rdd[String]  each string is a json ,lowercased json
    val TransformedRDD = binary_zip_RDD.flatMap(kv => ZipDecompressor.Zip_open_hybrid(kv._1, kv._2, proccessingtimestamp))
 // now the schema of dataframe would be consolidate schema of all json strings
    val jsonDataframe_stream = sparkSession.read.option("primitivesAsString", true).json(TransformedRDD)
    println(jsonDataframe_stream.printSchema())

    jsonDataframe_stream.write.mode(SaveMode.Append).partitionBy(GetConstantValue.DEVICEDATE).parquet(ApplicationProperties.OUTPUT_DIRECTORY)

最新更新