Scala Spark - 使用数组作为输入来执行多个分组级别



在我的 Scala 程序中,我正在处理一个问题,以组合来自多个级别的 GroupBy 的结果。我正在使用的数据集非常大。作为一个小示例,我有一个如下所示的数据帧:

+---+---+----+-----+-----+
|  F|  L| Loy|Email|State|
+---+---+----+-----+-----+
| f1| l1|loy1| null|   s1|
| f1| l1|loy1|   e1|   s1|
| f2| l2|loy2|   e2|   s2|
| f2| l2|loy2|   e3| null|
| f1| l1|null|   e1|   s3|
+---+---+----+-----+-----+

对于第一级组,我使用以下脚本,基于相同的(F,L,Loy)列获得结果:

df.groupBy("F", "L", "Loy").agg(collect_set($"Email").alias("Email"), collect_set($"State").alias("State")).show

结果是这样的:

+---+---+----+--------+-----+
|  F|  L| Loy|   Email|State|
+---+---+----+--------+-----+
| f1| l1|null|    [e1]| [s3]|
| f2| l2|loy2|[e2, e3]| [s2]|
| f1| l1|loy1|    [e1]| [s1]|
+---+---+----+--------+-----+

我正在处理的问题是我如何执行第二级 groupBy,它基于条件(F、L、电子邮件),并将 FL 作为字符串作为输入,而电子邮件列作为数组[字符串]。此 groupBy 应返回如下结果:

+---+---+----+--------+---------+
|  F|  L| Loy|   Email|    State|
+---+---+----+--------+---------+
| f1| l1|loy1|    [e1]| [s3, s1]|
| f2| l2|loy2|[e2, e3]|     [s2]|
+---+---+----+--------+---------+

主要目标是通过在不同级别应用 groupBy 来尽可能减少条目数量。我对 Scala 很陌生,任何帮助将不胜感激:)

只需将 concat_ws() 与空分隔符一起使用,它将删除简单元素的状态数组,然后collect_set将再次将数组变为状态。看看这个。

scala> val df = Seq( ("f1","l1","loy1",null,"s1"),("f1","l1","loy1","e1","s1"),("f2","l2","loy2","e2","s2"),("f2","l2","loy2","e3",null),("f1","l1",null,"e1","s3")).toDF("F","L","loy","email","state")
df: org.apache.spark.sql.DataFrame = [F: string, L: string ... 3 more fields]
scala> df.show(false)
+---+---+----+-----+-----+
|F  |L  |loy |email|state|
+---+---+----+-----+-----+
|f1 |l1 |loy1|null |s1   |
|f1 |l1 |loy1|e1   |s1   |
|f2 |l2 |loy2|e2   |s2   |
|f2 |l2 |loy2|e3   |null |
|f1 |l1 |null|e1   |s3   |
+---+---+----+-----+-----+

scala> val df2 = df.groupBy("F", "L", "Loy").agg(collect_set($"Email").alias("Email"), collect_set($"State").alias("State"))
df2: org.apache.spark.sql.DataFrame = [F: string, L: string ... 3 more fields]
scala> df2.show(false)
+---+---+----+--------+-----+
|F  |L  |Loy |Email   |State|
+---+---+----+--------+-----+
|f1 |l1 |null|[e1]    |[s3] |
|f2 |l2 |loy2|[e2, e3]|[s2] |
|f1 |l1 |loy1|[e1]    |[s1] |
+---+---+----+--------+-----+

scala> df2.groupBy("F","L","email").agg(max('loy).as("loy"),collect_set(concat_ws("",'state)).as("state")).show
+---+---+--------+----+--------+
|  F|  L|   email| loy|   state|
+---+---+--------+----+--------+
| f2| l2|[e2, e3]|loy2|    [s2]|
| f1| l1|    [e1]|loy1|[s3, s1]|
+---+---+--------+----+--------+

scala>

相关内容

  • 没有找到相关文章

最新更新