Spark Scala基于目标JSON结构创建模式



我在尝试基于我知道自己想要的JSON结构生成Spark Schema时陷入了绝望的困境。我有一个JSON结构,看起来像这样:

{
"key1": "value1",
"key2": "value2",
"key3": "value3",
"key4": "value4",
"key5": [
"key6": "value6",
"key7": [
"key8": "value8",
"key8": "value9"
]
]
}

我试图通过在Spark 2.4.8中创建以下在Scala中运行的Schema来重新创建该结构:

val targetSchemaSO = StructType(
List(
StructField("key1", StringType, true),
StructField("key2", StringType, true),
StructField("key3", StringType, true),
StructField("key4", StringType, true),
StructField("key5", StructType(
List(
StructField("key6", StringType, true),
StructField("key7", ArrayType(StructType(
List(
StructField("key8", StringType, true)
))), true)
)), true)
)
)

然而,当尝试使用以下代码将每一行格式化为Spark row时:

val outputDictSO = scala.collection.mutable.LinkedHashMap[String, Any](
"key1" -> "value1",
"key2" -> "value2",
"key3" -> "value3",
"key4" -> "value4",
"key5" -> (
"key6" -> "value6",
"key7" -> (
"key8" -> "value8",
"key8" -> "value9"
)  
)
)
return Row.fromSeq(output_dict.values.toSeq)

我在将其映射到提供的架构时遇到以下错误:

Error while encoding: java.lang.RuntimeException: scala.Tuple2 is not a valid external
type for schema of struct<key6:string,key7:array<struct<key8:string>>>

我所基于的程序在PySpark中使用了这个确切的Schema,DataFrame创建得很好;PySpark和Spark Scala的StructType工作方式不同吗?为了使模式中的嵌套数组成为可能,应该制作什么样的正确模式?

您可以使用from_json函数,将json字符串列作为输入并打印schema来检索有效的模式。(使用数据中的一组json字符串,如果需要,创建数据帧(

val df1 = df.withColumn("json",from_json(json_string_column))
df1.printSchema()

用两种语言创建DataFrame模式没有区别。如果您想创建一个具有指定模式的数据帧,根据所提供的模式,构建Rows的正确方法可能是:

val outputDictSO =
Row("value1", "value2", "value3", "value4",
Row("value6", Array(
Row("value8"))))
val df0 = 
spark.createDataFrame(
spark.sparkContext.parallelize(Seq(outputDictSO)), targetSchemaSO)
df0.printSchema()

数据帧的模式如预期:

root
|-- key1: string (nullable = true)
|-- key2: string (nullable = true)
|-- key3: string (nullable = true)
|-- key4: string (nullable = true)
|-- key5: struct (nullable = true)
|    |-- key6: string (nullable = true)
|    |-- key7: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- key8: string (nullable = true)

如果您看到数据帧的内容:

df0.show()

提供:

+------+------+------+------+--------------------+
|  key1|  key2|  key3|  key4|                key5|
+------+------+------+------+--------------------+
|value1|value2|value3|value4|[value6, [[value8]]]|
+------+------+------+------+--------------------+

您可以选择嵌套密钥:

df0.select("key5.key6").show()

Spark退货:

+------+
|  key6|
+------+
|value6|
+------+

错误是因为指定的架构和数据不匹配。如果您获取数据:

val outputDictSOMap = Map(
"key1" -> "value1",
"key2" -> "value2",
"key3" -> "value3",
"key4" -> "value4",
"key5" -> (
"key6" -> "value6",
"key7" -> (
"key8" -> "value8",
"key9" -> "value9"
)
))

并将其转换为json:

import org.json4s.jackson.Serialization
implicit val formats = org.json4s.DefaultFormats
val json = Serialization.write(outputDictSOMap)
val df1 = spark.read.json(Seq(json).toDS)
df1.printSchema()

得到的模式是:

root
|-- key1: string (nullable = true)
|-- key2: string (nullable = true)
|-- key3: string (nullable = true)
|-- key4: string (nullable = true)
|-- key5: struct (nullable = true)
|    |-- _1: struct (nullable = true)
|    |    |-- key6: string (nullable = true)
|    |-- _2: struct (nullable = true)
|    |    |-- key7: struct (nullable = true)
|    |    |    |-- _1: struct (nullable = true)
|    |    |    |    |-- key8: string (nullable = true)
|    |    |    |-- _2: struct (nullable = true)
|    |    |    |    |-- key9: string (nullable = true)

这就是你犯错误的原因。

最新更新