合并具有不同架构的数据帧 - Scala Spark



我正在将JSON转换为数据帧。在第一步中,我创建一个数据框数组,然后创建一个联合。但是我在具有不同模式的 JSON 中执行联合时遇到了问题。

如果 JSON 具有相同的模式,我可以做到这一点,就像您在另一个问题中看到的那样:使用 Spark-Scala 解析列中的 JSON 根

我正在处理以下数据:

val exampleJsonDifferentSchema = spark.createDataset(
"""
{"ITEM1512":
{"name":"Yin",
"address":{"city":"Columbus",
"state":"Ohio"},
"age":28           }, 
"ITEM1518":
{"name":"Yang",
"address":{"city":"Working",
"state":"Marc"}
},
"ITEM1458":
{"name":"Yossup",
"address":{"city":"Macoss",
"state":"Microsoft"},
"age":28
}
}""" :: Nil)

如您所见,不同之处在于一个数据框没有年龄。

val itemsExampleDiff = spark.read.json(exampleJsonDifferentSchema)
itemsExampleDiff.show(false)
itemsExampleDiff.printSchema
+---------------------------------+---------------------------+-----------------------+
|ITEM1458                         |ITEM1512                   |ITEM1518               |
+---------------------------------+---------------------------+-----------------------+
|[[Macoss, Microsoft], 28, Yossup]|[[Columbus, Ohio], 28, Yin]|[[Working, Marc], Yang]|
+---------------------------------+---------------------------+-----------------------+
root
|-- ITEM1458: struct (nullable = true)
|    |-- address: struct (nullable = true)
|    |    |-- city: string (nullable = true)
|    |    |-- state: string (nullable = true)
|    |-- age: long (nullable = true)
|    |-- name: string (nullable = true)
|-- ITEM1512: struct (nullable = true)
|    |-- address: struct (nullable = true)
|    |    |-- city: string (nullable = true)
|    |    |-- state: string (nullable = true)
|    |-- age: long (nullable = true)
|    |-- name: string (nullable = true)
|-- ITEM1518: struct (nullable = true)
|    |-- address: struct (nullable = true)
|    |    |-- city: string (nullable = true)
|    |    |-- state: string (nullable = true)
|    |-- name: string (nullable = true)

我现在的解决方案是以下代码,我在其中制作一个数据帧数组:

val columns:Array[String]       = itemsExample.columns
var arrayOfExampleDFs:Array[DataFrame] = Array()
for(col_name <- columns){
val temp = itemsExample.select(lit(col_name).as("Item"), col(col_name).as("Value"))
arrayOfExampleDFs = arrayOfExampleDFs :+ temp
}
val jsonDF = arrayOfExampleDFs.reduce(_ union _)

但是我有一个具有不同模式的 JSON,当我在联合中减少时,我无法做到这一点,因为数据框需要具有相同的模式。实际上,我有以下错误:

org.apache.spark.sql.AnalysisException:只能对具有兼容列类型的表执行联合。

我正在尝试做我在这个问题中发现的类似事情:如何在 Spark 中具有不同列数量的两个数据帧上执行联合?

具体来说,这部分:

val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union
def expr(myCols: Set[String], allCols: Set[String]) = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}

但是我无法为列进行设置,因为我需要动态捕获总计和单列。我只能做这样的事情:

for(i <- 0 until arrayOfExampleDFs.length-1) {
val cols1 = arrayOfExampleDFs(i).select("Value").columns.toSet
val cols2 = arrayOfExampleDFs(i+1).select("Value").columns.toSet
val total = cols1 ++ cols2
arrayOfExampleDFs(i).select("Value").printSchema()
print(total)
}

那么,如何成为动态执行此联合的函数呢?

更新:预期输出

在本例中,此数据框和架构:

+--------+---------------------------------+
|Item    |Value                            |
+--------+---------------------------------+
|ITEM1458|[[Macoss, Microsoft], 28, Yossup]|
|ITEM1512|[[Columbus, Ohio], 28, Yin]      |
|ITEM1518|[[Working, Marc], null, Yang]    |
+--------+---------------------------------+
root
|-- Item: string (nullable = false)
|-- Value: struct (nullable = true)
|    |-- address: struct (nullable = true)
|    |    |-- city: string (nullable = true)
|    |    |-- state: string (nullable = true)
|    |-- age: long (nullable = true)
|    |-- name: string (nullable = true)

下面是一个可能的解决方案,它通过在找不到年龄列时添加年龄列来为所有数据帧创建通用架构:

import org.apache.spark.sql.functions.{col, lit, struct}
import org.apache.spark.sql.types.{LongType, StructField, StructType}
....
for(col_name <- columns){
val currentDf = itemsExampleDiff.select(col(col_name))
// try to identify if age field is present
val hasAge = currentDf.schema.fields(0)
.dataType
.asInstanceOf[StructType]
.fields
.contains(StructField("age", LongType, true))
val valueCol = hasAge match {
// if not construct a new value column
case false => struct(
col(s"${col_name}.address"), 
lit(null).cast("bigint").as("age"),
col(s"${col_name}.name")
)
case true => col(col_name)
}
arrayOfExampleDFs = arrayOfExampleDFs :+ currentDf.select(lit(col_name).as("Item"), valueCol.as("Value"))
}
val jsonDF = arrayOfExampleDFs.reduce(_ union _)
// +--------+---------------------------------+
// |Item    |Value                            |
// +--------+---------------------------------+
// |ITEM1458|[[Macoss, Microsoft], 28, Yossup]|
// |ITEM1512|[[Columbus, Ohio], 28, Yin]      |
// |ITEM1518|[[Working, Marc],, Yang]         |
// +--------+---------------------------------+

分析:可能要求最高的部分是找出age是否存在。对于查找,我们使用df.schema.fields属性,它允许我们深入研究每列的内部架构。

当找不到年龄时,我们使用struct重新生成列:

struct(
col(s"${col_name}.address"), 
lit(null).cast("bigint").as("age"),
col(s"${col_name}.name")
)

最新更新