如何在Spark中获取join产生的行数



考虑这两个数据帧:

+---+
|id |
+---+
|1  |
|2  |
|3  |
+---+

+---+-----+
|idz|word |
+---+-----+
|1  |bat  |
|1  |mouse|
|2  |horse|
+---+-----+

我正在做Left join on ID=IDZ:

val r = df1.join(df2, (df1("id") === df2("idz")), "left_outer").
withColumn("ID_EMPLOYE_VENDEUR", when(col("word") =!= ("null"), col("word")).otherwise(null)).drop("word")
r.show(false)
+---+----+------------------+
|id |idz |ID_EMPLOYE_VENDEUR|
+---+----+------------------+
|1  |1   |mouse             |
|1  |1   |bat               |
|2  |2   |horse             |
|3  |null|null              |
+---+----+------------------+

但是,如果我只想保留ID只有一个相等IDZ的行呢?如果没有,我希望ID_EMPLOYE_VENDEUR中有null。所需输出为:

+---+----+------------------+
|id |idz |ID_EMPLOYE_VENDEUR|
+---+----+------------------+
|1  |1   |null              | --Because the Join resulted two different lines
|2  |2   |horse             |
|3  |null|null              |
+---+----+------------------+

我应该准确地说,我正在研究一个大型DF。该解决方案在时间上应该不是很昂贵。

谢谢

根据您提到的数据,您的数据太大,因此groupBy不是分组数据和通过以下功能加入Windows的好选择:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
def windowSpec = Window.partitionBy("idz")
val newDF = df1.withColumn("count", count("idz").over(windowSpec)).dropDuplicates("idz").withColumn("word", when(col("count") >=2 , lit(null)).otherwise(col("word"))).drop("count")
val r = df1.join(newDF, (df1("id") === newDF("idz")), "left_outer").withColumn("ID_EMPLOYE_VENDEUR", when(col("word") =!= ("null"), col("word")).otherwise(null)).drop("word") 
r show 
+---+----+------------------+
| id| idz|ID_EMPLOYE_VENDEUR|
+---+----+------------------+
|  1|   1|              null|
|  3|null|              null|
|  2|   2|             horse|
+---+----+------------------+

您可以通过groupBy和join轻松检索多个df2idz与单个df1id匹配的信息。

r.join(
r.groupBy("id").count().as("g"),
$"g.id" === r("id")
)
.withColumn(
"ID_EMPLOYE_VENDEUR",
expr("if(count != 1, null, ID_EMPLOYE_VENDEUR)")
)
.drop($"g.id").drop("count")
.distinct()
.show()

注意:groupBy和join都不会触发任何额外的交换步骤(在网络中混洗(,因为数据帧r已经在id上进行了分区(因为它是id上的join的结果(。

最新更新