在我的 Scala 程序中,我正在处理一个问题,以组合来自多个级别的 GroupBy 的结果。我正在使用的数据集非常大。作为一个小示例,我有一个如下所示的数据帧:
+---+---+----+-----+-----+
| F| L| Loy|Email|State|
+---+---+----+-----+-----+
| f1| l1|loy1| null| s1|
| f1| l1|loy1| e1| s1|
| f2| l2|loy2| e2| s2|
| f2| l2|loy2| e3| null|
| f1| l1|null| e1| s3|
+---+---+----+-----+-----+
对于第一级组,我使用以下脚本,基于相同的(F,L,Loy)列获得结果:
df.groupBy("F", "L", "Loy").agg(collect_set($"Email").alias("Email"), collect_set($"State").alias("State")).show
结果是这样的:
+---+---+----+--------+-----+
| F| L| Loy| Email|State|
+---+---+----+--------+-----+
| f1| l1|null| [e1]| [s3]|
| f2| l2|loy2|[e2, e3]| [s2]|
| f1| l1|loy1| [e1]| [s1]|
+---+---+----+--------+-----+
我正在处理的问题是我如何执行第二级 groupBy,它基于条件(F、L、电子邮件),并将 F 和 L 作为字符串作为输入,而电子邮件列作为数组[字符串]。此 groupBy 应返回如下结果:
+---+---+----+--------+---------+
| F| L| Loy| Email| State|
+---+---+----+--------+---------+
| f1| l1|loy1| [e1]| [s3, s1]|
| f2| l2|loy2|[e2, e3]| [s2]|
+---+---+----+--------+---------+
主要目标是通过在不同级别应用 groupBy 来尽可能减少条目数量。我对 Scala 很陌生,任何帮助将不胜感激:)
只需将 concat_ws() 与空分隔符一起使用,它将删除简单元素的状态数组,然后collect_set将再次将数组变为状态。看看这个。
scala> val df = Seq( ("f1","l1","loy1",null,"s1"),("f1","l1","loy1","e1","s1"),("f2","l2","loy2","e2","s2"),("f2","l2","loy2","e3",null),("f1","l1",null,"e1","s3")).toDF("F","L","loy","email","state")
df: org.apache.spark.sql.DataFrame = [F: string, L: string ... 3 more fields]
scala> df.show(false)
+---+---+----+-----+-----+
|F |L |loy |email|state|
+---+---+----+-----+-----+
|f1 |l1 |loy1|null |s1 |
|f1 |l1 |loy1|e1 |s1 |
|f2 |l2 |loy2|e2 |s2 |
|f2 |l2 |loy2|e3 |null |
|f1 |l1 |null|e1 |s3 |
+---+---+----+-----+-----+
scala> val df2 = df.groupBy("F", "L", "Loy").agg(collect_set($"Email").alias("Email"), collect_set($"State").alias("State"))
df2: org.apache.spark.sql.DataFrame = [F: string, L: string ... 3 more fields]
scala> df2.show(false)
+---+---+----+--------+-----+
|F |L |Loy |Email |State|
+---+---+----+--------+-----+
|f1 |l1 |null|[e1] |[s3] |
|f2 |l2 |loy2|[e2, e3]|[s2] |
|f1 |l1 |loy1|[e1] |[s1] |
+---+---+----+--------+-----+
scala> df2.groupBy("F","L","email").agg(max('loy).as("loy"),collect_set(concat_ws("",'state)).as("state")).show
+---+---+--------+----+--------+
| F| L| email| loy| state|
+---+---+--------+----+--------+
| f2| l2|[e2, e3]|loy2| [s2]|
| f1| l1| [e1]|loy1|[s3, s1]|
+---+---+--------+----+--------+
scala>