JSON中的Spark DataFrame用行交换列



我有一个给定的JSON,它取自HDFS,有数千条这样的记录:

{
"01": {
"created": "2020-12-28 02-15-01", 
"entity_id": "s.m_free", 
"old_state_id": null, 
"state": "1498.7"
}, 
"02": {
"created": "2020-12-28 02-15-31", 
"entity_id": "s.m_free", 
"old_state_id": 58100, 
"state": "1498.9"
}, 
...}

不幸的是,DataFrame显示为数千列,只有4行,如下所示:

|                 01 |                   02|..................| 
created       |2020-12-28 02-15-01 |  2020-12-28 02-15-31|..................|
entity_id     |           s.m_free |             s.m_free|..................|
old_state_id  |               null |                58100|..................|
state         |             1498.7 |               1498.9|..................|

我需要它与4列和数千记录作为:

|             created| entity_id| old_state_id|  state|
01     | 2020-12-28 02-15-01|  s.m.free|         null| 1498.7|
02     | 2020-12-28 02-15-31|  s.m.free|        58100| 1498.9|

我为PySpark找到了一个选项,可以使用Pandas更改数据帧的方向,但由于我必须使用Scala来完成任务,我找不到类似的选项。

还有一种方法可以将名称放在第一列(记录01、02等(上,因为它似乎是json文件中值的键。

如果你能帮助我,我将非常高兴。

此部分模拟原始数据帧的生成
与此示例类似,请确保在实际场景中也使用option("primitivesAsString",true)
这是为了解决由于Spark默认类型为null(字符串(而导致的不匹配类型的问题
例如,如果没有option("primitivesAsString",true),对于"old_state_id": 58100old_state_id将被推断为长,而对于"old_state_id": null,它将被推断成字符串。

import spark.implicits._
val json_str = """
{
"01": {
"created": "2020-12-28 02-15-01", 
"entity_id": "s.m_free", 
"old_state_id": null, 
"state": "1498.7"
}, 
"02": {
"created": "2020-12-28 02-15-31", 
"entity_id": "s.m_free", 
"old_state_id": 58100, 
"state": "1498.9"
}
}"""
val df = spark.read.option("primitivesAsString",true).json(Seq(json_str).toDS)

df.printSchema()
root
|-- 01: struct (nullable = true)
|    |-- created: string (nullable = true)
|    |-- entity_id: string (nullable = true)
|    |-- old_state_id: string (nullable = true)
|    |-- state: string (nullable = true)
|-- 02: struct (nullable = true)
|    |-- created: string (nullable = true)
|    |-- entity_id: string (nullable = true)
|    |-- old_state_id: string (nullable = true)
|    |-- state: string (nullable = true)
df.show(false)
+---------------------------------------------+----------------------------------------------+
|01                                           |02                                            |
+---------------------------------------------+----------------------------------------------+
|{2020-12-28 02-15-01, s.m_free, null, 1498.7}|{2020-12-28 02-15-31, s.m_free, 58100, 1498.9}|
+---------------------------------------------+----------------------------------------------+

这是数据转换部分,基于堆栈

df.createOrReplaceTempView("t")
val cols_num = df.columns.size // 2
val cols_names_and_vals = (for (c <- df.columns) yield s"'$c',`$c`").mkString(",") // "'01',`01`,'02',`02`"
val sql_query = s"select id,val.* from (select stack($cols_num,$cols_names_and_vals) as (id,val) from t)" // select id,val.* from (select stack(2,'01',`01`,'02',`02`) as (id,val) from t)
val df_unpivot = spark.sql(sql_query)

df_unpivot.printSchema()
root
|-- id: string (nullable = true)
|-- created: string (nullable = true)
|-- entity_id: string (nullable = true)
|-- old_state_id: string (nullable = true)
|-- state: string (nullable = true)

df_unpivot.show(truncate = false)
+---+-------------------+---------+------------+------+
|id |created            |entity_id|old_state_id|state |
+---+-------------------+---------+------------+------+
|01 |2020-12-28 02-15-01|s.m_free |null        |1498.7|
|02 |2020-12-28 02-15-31|s.m_free |58100       |1498.9|
+---+-------------------+---------+------------+------+

相关内容

  • 没有找到相关文章

最新更新