我有一个给定的JSON,它取自HDFS,有数千条这样的记录:
{
"01": {
"created": "2020-12-28 02-15-01",
"entity_id": "s.m_free",
"old_state_id": null,
"state": "1498.7"
},
"02": {
"created": "2020-12-28 02-15-31",
"entity_id": "s.m_free",
"old_state_id": 58100,
"state": "1498.9"
},
...}
不幸的是,DataFrame显示为数千列,只有4行,如下所示:
| 01 | 02|..................|
created |2020-12-28 02-15-01 | 2020-12-28 02-15-31|..................|
entity_id | s.m_free | s.m_free|..................|
old_state_id | null | 58100|..................|
state | 1498.7 | 1498.9|..................|
我需要它与4列和数千记录作为:
| created| entity_id| old_state_id| state|
01 | 2020-12-28 02-15-01| s.m.free| null| 1498.7|
02 | 2020-12-28 02-15-31| s.m.free| 58100| 1498.9|
我为PySpark找到了一个选项,可以使用Pandas更改数据帧的方向,但由于我必须使用Scala来完成任务,我找不到类似的选项。
还有一种方法可以将名称放在第一列(记录01、02等(上,因为它似乎是json文件中值的键。
如果你能帮助我,我将非常高兴。
此部分模拟原始数据帧的生成
与此示例类似,请确保在实际场景中也使用option("primitivesAsString",true)
这是为了解决由于Spark默认类型为null(字符串(而导致的不匹配类型的问题
例如,如果没有option("primitivesAsString",true)
,对于"old_state_id": 58100
,old_state_id
将被推断为长,而对于"old_state_id": null
,它将被推断成字符串。
import spark.implicits._
val json_str = """
{
"01": {
"created": "2020-12-28 02-15-01",
"entity_id": "s.m_free",
"old_state_id": null,
"state": "1498.7"
},
"02": {
"created": "2020-12-28 02-15-31",
"entity_id": "s.m_free",
"old_state_id": 58100,
"state": "1498.9"
}
}"""
val df = spark.read.option("primitivesAsString",true).json(Seq(json_str).toDS)
df.printSchema()
root
|-- 01: struct (nullable = true)
| |-- created: string (nullable = true)
| |-- entity_id: string (nullable = true)
| |-- old_state_id: string (nullable = true)
| |-- state: string (nullable = true)
|-- 02: struct (nullable = true)
| |-- created: string (nullable = true)
| |-- entity_id: string (nullable = true)
| |-- old_state_id: string (nullable = true)
| |-- state: string (nullable = true)
df.show(false)
+---------------------------------------------+----------------------------------------------+
|01 |02 |
+---------------------------------------------+----------------------------------------------+
|{2020-12-28 02-15-01, s.m_free, null, 1498.7}|{2020-12-28 02-15-31, s.m_free, 58100, 1498.9}|
+---------------------------------------------+----------------------------------------------+
这是数据转换部分,基于堆栈
df.createOrReplaceTempView("t")
val cols_num = df.columns.size // 2
val cols_names_and_vals = (for (c <- df.columns) yield s"'$c',`$c`").mkString(",") // "'01',`01`,'02',`02`"
val sql_query = s"select id,val.* from (select stack($cols_num,$cols_names_and_vals) as (id,val) from t)" // select id,val.* from (select stack(2,'01',`01`,'02',`02`) as (id,val) from t)
val df_unpivot = spark.sql(sql_query)
df_unpivot.printSchema()
root
|-- id: string (nullable = true)
|-- created: string (nullable = true)
|-- entity_id: string (nullable = true)
|-- old_state_id: string (nullable = true)
|-- state: string (nullable = true)
df_unpivot.show(truncate = false)
+---+-------------------+---------+------------+------+
|id |created |entity_id|old_state_id|state |
+---+-------------------+---------+------------+------+
|01 |2020-12-28 02-15-01|s.m_free |null |1498.7|
|02 |2020-12-28 02-15-31|s.m_free |58100 |1498.9|
+---+-------------------+---------+------------+------+