使用 Spark 将 JSON 字典映射到案例类



在过去的几个小时里,我试图使用Apache Spark将JSON文件转换为Scalacase class

JSON 具有以下结构:

{
"12": {
"wordA": 1,
"wordB": 2,
"wordC": 3
},
"13": {
"wordX": 10,
"wordY": 12,
"wordZ": 15
}
}

首次尝试:设置累积架构

我试图人为地构建我的架构:

val schema = new StructType()
.add("",MapType(StringType, new StructType()
.add("", StringType)
.add("", IntegerType)))
val df = session.read
.option("multiline",true)
.option("mode", "PERMISSIVE")
.schema(schema)
.json(filePath)
df.show()

但这显然是不对的,因为我必须给出字段名称。

第二次尝试:映射到案例类

我还尝试创建case classes,它更优雅一些:

case class KeywordData (keywordsByCode: Map[String, WordAndWeight])
case class WordAndWeight (word: String, weight: Int)

问题:

但无论如何,df.show(( 显示:

+----+
|    |
+----+
|null|
+----+

JSON 结构不容易操作,因为我的列没有修复名称。知道吗?

预期成果

以 12 和 13 为键和列表[wordA,...的地图wordC] 分别列出 [wordX, ..., wordZ] 作为值

编辑:地图地图与案例类

case class WordAndWeight(code: Map[String, Map[String, Integer]])

它给了我以下错误:

+-------+----------+
|     12|        13|
+-------+----------+
|[1,2,3]|[10,12,15]|
+-------+----------+

cannot resolve '`code`' given input columns: [12, 13];
org.apache.spark.sql.AnalysisException: cannot resolve '`code`' given input columns: [12, 13];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)

您尝试定义将 MapType 作为根类型的架构。换句话说,您希望每条线都是映射。AFAIK Spark不支持MapType作为根类型。它仅支持结构类型作为根类型。

当您通过案例类和反射定义类型时,如下所示:

val schema = ScalaReflection.schemaFor[KeywordData].dataType.asInstanceOf[StructType]

你得到 StructType 作为根类型:

root
|-- keywordsByCode: map (nullable = true)
|    |-- key: string
|    |-- value: struct (valueContainsNull = true)
|    |    |-- word: string (nullable = true)
|    |    |-- weight: integer (nullable = true)

这意味着Spark将创建具有一列的数据帧,该列称为keywordsByCode。 它会期待像这样的 JSON

{"keywordsByCode":{"12":{"wordA":1,"wordB":2,"wordC":3},"13":{"wordX":10,"wordY":12,"wordZ":15}}}

您需要修改 JSON 或像文本一样读取文件,然后将每一行解析为 JSON。

更新

我没有注意到另一个错误,你的案例类应该看起来像:

case class KeywordData (keywordsByCode: Map[String, Map[String, Int]])

因为你的JSON嵌套了MapType。因此,架构将如下所示:

root
|-- keywordsByCode: map (nullable = true)
|    |-- key: string
|    |-- value: map (valueContainsNull = true)
|    |    |-- key: string
|    |    |-- value: integer (valueContainsNull = true)

我的测试代码:

val df = spark.read
.option("multiline",true)
.option("mode", "PERMISSIVE")
.schema(ScalaReflection.schemaFor[KeywordData].dataType.asInstanceOf[StructType])
.json("test.json")
df.printSchema()
df.explain(true)
df.show(10)

最新更新