我是scala/spark的新手。
我的数据帧:
# column Header #
id A1 B1 C1 A2 B2 C2 A3 B3 C3 .................. A40 B40 C40
# data #
1 x y z a b c null null null .............................
2 a b c null null null null null null .................. m n o
3 x y z null null null d e f ................................
预期输出
1 [{id:1,A:x,B:y,C:z},{id:2,A:a,B:b,C:c}]
2 [{id:1,A:a,B:b,C:c},{id:40,A:m,B:n,C:o}]
3 [{id:1,A:x,B:y,C:z},{id:3,A:d,B:e,C:f}]
检查以下代码。
scala> df.show(false)
+---+---+---+---+----+----+----+----+----+----+
|id |A1 |B1 |C1 |A2 |B2 |C2 |A3 |B3 |C3 |
+---+---+---+---+----+----+----+----+----+----+
|1 |x |y |z |a |b |c |null|null|null|
|2 |a |b |c |null|null|null|m |n |o |
|3 |x |y |z |null|null|null|d |e |f |
+---+---+---+---+----+----+----+----+----+----+
val colExpr = to_json(
filter(
array(
df
.columns
.tail
.map(_.split(""))
.groupBy(d => d.last)
.flatMap(d => Seq(Seq(lit(d._1).as("id")) ++ d._2.map(_.head).map(c => col(s"${c}${d._1}").as(c))))
.map(hd => when(hd.tail.map(c => c.isNotNull).reduce(_ and _),struct(hd:_*)))
.toSeq:_*
).as("data"),
column => column.isNotNull
)
).as("data")
最终输出
scala> df.select($"id",colExpr).show(false)
+---+-----------------------------------------------------------------------+
|id |data |
+---+-----------------------------------------------------------------------+
|1 |[{"id":"2","A":"a","B":"b","C":"c"},{"id":"1","A":"x","B":"y","C":"z"}]|
|2 |[{"id":"1","A":"a","B":"b","C":"c"},{"id":"3","A":"m","B":"n","C":"o"}]|
|3 |[{"id":"1","A":"x","B":"y","C":"z"},{"id":"3","A":"d","B":"e","C":"f"}]|
+---+-----------------------------------------------------------------------+
注意:Spark版本3.0