在过去的几个小时里,我试图使用Apache Spark将JSON文件转换为Scalacase class
。
JSON 具有以下结构:
{
"12": {
"wordA": 1,
"wordB": 2,
"wordC": 3
},
"13": {
"wordX": 10,
"wordY": 12,
"wordZ": 15
}
}
首次尝试:设置累积架构
我试图人为地构建我的架构:
val schema = new StructType()
.add("",MapType(StringType, new StructType()
.add("", StringType)
.add("", IntegerType)))
val df = session.read
.option("multiline",true)
.option("mode", "PERMISSIVE")
.schema(schema)
.json(filePath)
df.show()
但这显然是不对的,因为我必须给出字段名称。
第二次尝试:映射到案例类
我还尝试创建case class
es,它更优雅一些:
case class KeywordData (keywordsByCode: Map[String, WordAndWeight])
case class WordAndWeight (word: String, weight: Int)
问题:
但无论如何,df.show(( 显示:
+----+
| |
+----+
|null|
+----+
JSON 结构不容易操作,因为我的列没有修复名称。知道吗?
预期成果
以 12 和 13 为键和列表[wordA,...的地图wordC] 分别列出 [wordX, ..., wordZ] 作为值
编辑:地图地图与案例类
case class WordAndWeight(code: Map[String, Map[String, Integer]])
它给了我以下错误:
+-------+----------+
| 12| 13|
+-------+----------+
|[1,2,3]|[10,12,15]|
+-------+----------+
cannot resolve '`code`' given input columns: [12, 13];
org.apache.spark.sql.AnalysisException: cannot resolve '`code`' given input columns: [12, 13];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
您尝试定义将 MapType 作为根类型的架构。换句话说,您希望每条线都是映射。AFAIK Spark不支持MapType作为根类型。它仅支持结构类型作为根类型。
当您通过案例类和反射定义类型时,如下所示:
val schema = ScalaReflection.schemaFor[KeywordData].dataType.asInstanceOf[StructType]
你得到 StructType 作为根类型:
root
|-- keywordsByCode: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- word: string (nullable = true)
| | |-- weight: integer (nullable = true)
这意味着Spark将创建具有一列的数据帧,该列称为keywordsByCode
。 它会期待像这样的 JSON
{"keywordsByCode":{"12":{"wordA":1,"wordB":2,"wordC":3},"13":{"wordX":10,"wordY":12,"wordZ":15}}}
您需要修改 JSON 或像文本一样读取文件,然后将每一行解析为 JSON。
更新
我没有注意到另一个错误,你的案例类应该看起来像:
case class KeywordData (keywordsByCode: Map[String, Map[String, Int]])
因为你的JSON嵌套了MapType。因此,架构将如下所示:
root
|-- keywordsByCode: map (nullable = true)
| |-- key: string
| |-- value: map (valueContainsNull = true)
| | |-- key: string
| | |-- value: integer (valueContainsNull = true)
我的测试代码:
val df = spark.read
.option("multiline",true)
.option("mode", "PERMISSIVE")
.schema(ScalaReflection.schemaFor[KeywordData].dataType.asInstanceOf[StructType])
.json("test.json")
df.printSchema()
df.explain(true)
df.show(10)