Spark Union通过嵌套的JSON DataFrame失败



我有以下两个JSON文件:

{
    "name" : "Agent1",
    "age" : "32",
    "details" : [{
            "d1" : 1,
            "d2" : 2
        }
    ]
}
{
    "name" : "Agent2",
    "age" : "42",
    "details" : []
}

我用火花阅读它们:

val jsonDf1 = spark.read.json(pathToJson1)
val jsonDf2 = spark.read.json(pathToJson2)

使用以下模式创建了两个数据框:

root
 |-- age: string (nullable = true)
 |-- details: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- d1: long (nullable = true)
 |    |    |-- d2: long (nullable = true)
 |-- name: string (nullable = true)
root
|-- age: string (nullable = true)
|-- details: array (nullable = true)
|    |-- element: string (containsNull = true)
|-- name: string (nullable = true)

当我尝试与这两个数据框架执行联合时,我会收到此错误:

jsonDf1.union(jsonDf2)

org.apache.spark.sql.AnalysisException: unresolved operator 'Union;;
'Union
:- LogicalRDD [age#0, details#1, name#2]
+- LogicalRDD [age#7, details#8, name#9]

如何解决这个问题?我有时会在JSON文件中获得空数组,Spark作业将加载,但它仍然必须统一它们,这不是一个问题,因为JSON文件的模式是相同的。

如果您尝试将2个数据框结合在一起,您将获得此信息:

error:org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. ArrayType(StringType,true) <> ArrayType(StructType(StructField(d1,StringType,true), StructField(d2,StringType,true)),true) at the second column of the second table

JSON文件同时到达

解决这个问题,如果您可以同时阅读JSON,我建议:

val jsonDf1 = spark.read.json("json1.json", "json2.json")

这将提供此模式:

jsonDf1.printSchema
 |-- age: string (nullable = true)
 |-- details: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- d1: long (nullable = true)
 |    |    |-- d2: long (nullable = true)
 |-- name: string (nullable = true)

数据输出

jsonDf1.show(10,truncate = false)
+---+-------+------+
|age|details|name  |
+---+-------+------+
|32 |[[1,2]]|Agent1|
|42 |null   |Agent2|
+---+-------+------+

JSON文件在不同时间到达

如果您的JSON到达不同的时间(作为默认解决方案),我建议读取具有完整数组的模板JSON对象,这将使您的数据框架具有可能的空数组对任何联合有效。然后,在输出结果之前,您将使用此假JSON删除:

val df = spark.read.json("jsonWithMaybeAnEmptyArray.json", 
"TemplateFakeJsonWithAFullArray.json")
df.filter($"name" !== "FakeAgent").show(1)

请注意:已打开了JIRA卡以提高合并SQL数据类型的能力:https://issues.apache.org/jira/jira/browse/browse/spark-19536和此类操作在下一个火花版本中应该有可能。

polomarcus的答案使我找到了这个解决方案:我无法一次读取所有文件,因为我有一个文件列表作为输入,并且Spark没有接收路径列表的API,但是显然可以使用Scala来执行此操作:

val files = List("path1", "path2", "path3")
val dataframe = spark.read.json(files: _*)

这样,我得到了一个包含所有三个文件的数据帧。

相关内容

  • 没有找到相关文章

最新更新