关于我的一个更大的问题,我遇到了两个小问题:我想每天读取一次JSON数据,并将其保存为Parquet,以便以后进行与数据相关的工作。使用镶木地板要快得多。但我陷入困境的是,在阅读拼花地板时,Spark总是试图从模式文件中获取模式,或者只是从第一个拼花地板文件中获取该模式,并假设所有文件的模式都相同。但也有一些情况,我们在某些列中有几天没有任何数据。
假设我有一个JSON文件,其中包含以下模式的数据:
root
|-- Id: long (nullable = true)
|-- People: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Name: string (nullable = true)
| | |-- Amount: double (nullable = true)
然后我有另一个JSON文件,其中没有"People"列的数据。因此,模式如下:
root
|-- Id: long (nullable = true)
|-- People: array (nullable = true)
| |-- element: string (containsNull = true)
当我将它们与read.json
一起读取时,Spark会遍历所有文件,并从这些文件中推断出合并的模式,更具体地说,从第一个文件中推断,只保留第二个文件中的行为空,但模式是正确的。
但是,当我分别阅读这些内容并分别写入parquet时,我无法将它们一起阅读,因为对于parquet,模式不匹配,我会出错。
我的第一个想法是读取缺少数据的文件,并通过强制转换列类型以匹配第一个模式来手动更改其模式,但这种手动转换是错误的,它可能不同步,我甚至不知道如何将此字符串类型强制转换为数组或结构类型。
另一个问题是,当"Amount"字段只有完整的整数时,Spark会根据需要将它们读取为长整数,而不是双整数。但如果我使用:
val df2 = df.withColumn("People.Amount", col("People.Amount").cast(org.apache.spark.sql.types.ArrayType(org.apache.spark.sql.types.DoubleType,true)))
然后,它不会更改原始列的类型,而是添加一个名为People.Amount
的新列
我认为您可以通过模式合并来调整一些内容(请参阅此处的文档)。如果你拥有的第一块镶木地板有正确的模式,那么你能做这样的事情将该模式应用于新的镶木地板吗?
// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
编辑
你说有200多个专栏,你都知道了吗?我看到了两条前进的道路,可能有很多方法可以实现这一点。一种是预先定义所有可以看到的字段。我过去所做的是创建一个带有单个伪记录的json文件,该文件包含我想要的所有字段,并且按我想要的方式键入。然后,您可以始终在"星期一"或"星期二"数据集的同时加载该记录,并在加载后将其剥离。这可能不是最好的练习,但这就是我前进道路上跌跌撞撞的原因。
另一种选择是停止尝试在正确的模式中加载/保存单个数据集,并在加载完所有数据后设置该模式。听起来不像是你想要走的路,但至少这样你就不会有这个特定的问题。