我在尝试基于我知道自己想要的JSON结构生成Spark Schema时陷入了绝望的困境。我有一个JSON结构,看起来像这样:
{
"key1": "value1",
"key2": "value2",
"key3": "value3",
"key4": "value4",
"key5": [
"key6": "value6",
"key7": [
"key8": "value8",
"key8": "value9"
]
]
}
我试图通过在Spark 2.4.8中创建以下在Scala中运行的Schema来重新创建该结构:
val targetSchemaSO = StructType(
List(
StructField("key1", StringType, true),
StructField("key2", StringType, true),
StructField("key3", StringType, true),
StructField("key4", StringType, true),
StructField("key5", StructType(
List(
StructField("key6", StringType, true),
StructField("key7", ArrayType(StructType(
List(
StructField("key8", StringType, true)
))), true)
)), true)
)
)
然而,当尝试使用以下代码将每一行格式化为Spark row时:
val outputDictSO = scala.collection.mutable.LinkedHashMap[String, Any](
"key1" -> "value1",
"key2" -> "value2",
"key3" -> "value3",
"key4" -> "value4",
"key5" -> (
"key6" -> "value6",
"key7" -> (
"key8" -> "value8",
"key8" -> "value9"
)
)
)
return Row.fromSeq(output_dict.values.toSeq)
我在将其映射到提供的架构时遇到以下错误:
Error while encoding: java.lang.RuntimeException: scala.Tuple2 is not a valid external
type for schema of struct<key6:string,key7:array<struct<key8:string>>>
我所基于的程序在PySpark中使用了这个确切的Schema,DataFrame创建得很好;PySpark和Spark Scala的StructType工作方式不同吗?为了使模式中的嵌套数组成为可能,应该制作什么样的正确模式?
您可以使用from_json函数,将json字符串列作为输入并打印schema来检索有效的模式。(使用数据中的一组json字符串,如果需要,创建数据帧(
val df1 = df.withColumn("json",from_json(json_string_column))
df1.printSchema()
用两种语言创建DataFrame模式没有区别。如果您想创建一个具有指定模式的数据帧,根据所提供的模式,构建Rows的正确方法可能是:
val outputDictSO =
Row("value1", "value2", "value3", "value4",
Row("value6", Array(
Row("value8"))))
val df0 =
spark.createDataFrame(
spark.sparkContext.parallelize(Seq(outputDictSO)), targetSchemaSO)
df0.printSchema()
数据帧的模式如预期:
root
|-- key1: string (nullable = true)
|-- key2: string (nullable = true)
|-- key3: string (nullable = true)
|-- key4: string (nullable = true)
|-- key5: struct (nullable = true)
| |-- key6: string (nullable = true)
| |-- key7: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- key8: string (nullable = true)
如果您看到数据帧的内容:
df0.show()
提供:
+------+------+------+------+--------------------+
| key1| key2| key3| key4| key5|
+------+------+------+------+--------------------+
|value1|value2|value3|value4|[value6, [[value8]]]|
+------+------+------+------+--------------------+
您可以选择嵌套密钥:
df0.select("key5.key6").show()
Spark退货:
+------+
| key6|
+------+
|value6|
+------+
错误是因为指定的架构和数据不匹配。如果您获取数据:
val outputDictSOMap = Map(
"key1" -> "value1",
"key2" -> "value2",
"key3" -> "value3",
"key4" -> "value4",
"key5" -> (
"key6" -> "value6",
"key7" -> (
"key8" -> "value8",
"key9" -> "value9"
)
))
并将其转换为json:
import org.json4s.jackson.Serialization
implicit val formats = org.json4s.DefaultFormats
val json = Serialization.write(outputDictSOMap)
val df1 = spark.read.json(Seq(json).toDS)
df1.printSchema()
得到的模式是:
root
|-- key1: string (nullable = true)
|-- key2: string (nullable = true)
|-- key3: string (nullable = true)
|-- key4: string (nullable = true)
|-- key5: struct (nullable = true)
| |-- _1: struct (nullable = true)
| | |-- key6: string (nullable = true)
| |-- _2: struct (nullable = true)
| | |-- key7: struct (nullable = true)
| | | |-- _1: struct (nullable = true)
| | | | |-- key8: string (nullable = true)
| | | |-- _2: struct (nullable = true)
| | | | |-- key9: string (nullable = true)
这就是你犯错误的原因。