在spark数据框中聚合时过滤数组值



我正在对以下数据帧执行聚合,以获得具有品牌数组的广告商列表

+------------+------+
|advertiser  |brand |
+------------+------+
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
+------------+------+
下面是我的代码:

import org.apache.spark.sql.functions.collect_list
df2
.groupBy("advertiser")
.agg(collect_list("brand").as("brands"))

这给了我下面的数据帧:

+------------+----------------+
|advertiser  |brands          |
+------------+----------------+
|Advertiser 1|[Brand1, Brand2]|
|Advertiser 2|[Brand3, Brand4]|
|Advertiser 3|[Brand5, Brand6]|
+------------+----------------+

在聚合过程中,我希望使用以下品牌表过滤品牌列表:

+------+------------+
|brand |brand name  |
+------+------------+
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
+------+------------+

为了实现:

+------------+--------+
|advertiser  |brands  |
+------------+--------+
|Advertiser 1|[Brand1]|
|Advertiser 2|[Brand3]|
|Advertiser 3|null    |
+------------+--------+

我看到你的问题的两个解决方案,我将调用收集解决方案andJoin Solution

收集解决方案如果你可以收集你的brands数据框,你可以使用这个收集的集合只保留正确的品牌时执行collect_list,然后flatten你的数组和替换空数组null如下:

import org.apache.spark.sql.functions.{array, col, collect_list, flatten, size, when}
val filteredBrands = brands.select("brand").collect().map(_.getString(0))
val finalDataframe = df2
.groupBy("advertiser")
.agg(collect_list(when(col("brand").isin(filteredBrands: _*), array(col("brand"))).otherwise(array())).as("brands"))
.withColumn("brands", flatten(col("brands")))
.withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))
<

连接解决方案/h3>

如果你的brands数据框不适合内存,你可以先左连接df2brands有一个包含品牌的列,如果品牌在brands数据框中,否则null,然后做你的组by,最后替换空数组,因为广告商没有品牌,你想通过null过滤:

import org.apache.spark.sql.functions.{col, collect_list}
val finalDataframe = df2
.join(brands.select(col("brand").as("filtered_brand")), col("filtered_brand") === col("brand"), "left_outer")
.groupBy("advertiser").agg(collect_list(col("filtered_brand")).as("brands"))
.withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))

详细信息那么如果我们从df2数据帧开始,如下所示:

+------------+------+
|advertiser  |brand |
+------------+------+
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
+------------+------+

brands数据帧如下:

+------+------------+
|brand |brand name  |
+------+------------+
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
+------+------------+

df2brands数据框(第一行)之间的第一个左外连接之后,您得到以下数据框:

+------------+------+--------------+
|advertiser  |brand |filtered_brand|
+------------+------+--------------+
|Advertiser 1|Brand1|Brand1        |
|Advertiser 1|Brand2|null          |
|Advertiser 2|Brand3|Brand3        |
|Advertiser 2|Brand4|null          |
|Advertiser 3|Brand5|null          |
|Advertiser 3|Brand6|null          |
+------------+------+--------------+

当您将此数据框按广告商分组,收集过滤品牌列表时,您将得到以下数据框:

+------------+--------+
|advertiser  |brands  |
+------------+--------+
|Advertiser 2|[Brand3]|
|Advertiser 3|[]      |
|Advertiser 1|[Brand1]|
+------------+--------+

最后,当您应用最后一行将空数组替换为null时,您将得到预期的结果:

+------------+--------+
|advertiser  |brands  |
+------------+--------+
|Advertiser 2|[Brand3]|
|Advertiser 3|null    |
|Advertiser 1|[Brand1]|
+------------+--------+
结论

<收集解决方案/strong>只创建一个昂贵的采样步骤(在groupBy期间),如果您的brands数据帧很小,则应该优先选择该步骤。<连接解决方案/strong>如果您的brands数据框很大,则可以工作,但它创建了许多昂贵的示例步骤,其中包含一个groupBy和一个join。

最新更新