考虑这两个数据帧:
+---+
|id |
+---+
|1 |
|2 |
|3 |
+---+
+---+-----+
|idz|word |
+---+-----+
|1 |bat |
|1 |mouse|
|2 |horse|
+---+-----+
我正在做Left join on ID=IDZ
:
val r = df1.join(df2, (df1("id") === df2("idz")), "left_outer").
withColumn("ID_EMPLOYE_VENDEUR", when(col("word") =!= ("null"), col("word")).otherwise(null)).drop("word")
r.show(false)
+---+----+------------------+
|id |idz |ID_EMPLOYE_VENDEUR|
+---+----+------------------+
|1 |1 |mouse |
|1 |1 |bat |
|2 |2 |horse |
|3 |null|null |
+---+----+------------------+
但是,如果我只想保留ID只有一个相等IDZ的行呢?如果没有,我希望ID_EMPLOYE_VENDEUR中有null。所需输出为:
+---+----+------------------+
|id |idz |ID_EMPLOYE_VENDEUR|
+---+----+------------------+
|1 |1 |null | --Because the Join resulted two different lines
|2 |2 |horse |
|3 |null|null |
+---+----+------------------+
我应该准确地说,我正在研究一个大型DF。该解决方案在时间上应该不是很昂贵。
谢谢
根据您提到的数据,您的数据太大,因此groupBy
不是分组数据和通过以下功能加入Windows的好选择:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
def windowSpec = Window.partitionBy("idz")
val newDF = df1.withColumn("count", count("idz").over(windowSpec)).dropDuplicates("idz").withColumn("word", when(col("count") >=2 , lit(null)).otherwise(col("word"))).drop("count")
val r = df1.join(newDF, (df1("id") === newDF("idz")), "left_outer").withColumn("ID_EMPLOYE_VENDEUR", when(col("word") =!= ("null"), col("word")).otherwise(null)).drop("word")
r show
+---+----+------------------+
| id| idz|ID_EMPLOYE_VENDEUR|
+---+----+------------------+
| 1| 1| null|
| 3|null| null|
| 2| 2| horse|
+---+----+------------------+
您可以通过groupBy和join轻松检索多个df2
的idz
与单个df1
的id
匹配的信息。
r.join(
r.groupBy("id").count().as("g"),
$"g.id" === r("id")
)
.withColumn(
"ID_EMPLOYE_VENDEUR",
expr("if(count != 1, null, ID_EMPLOYE_VENDEUR)")
)
.drop($"g.id").drop("count")
.distinct()
.show()
注意:groupBy和join都不会触发任何额外的交换步骤(在网络中混洗(,因为数据帧r
已经在id
上进行了分区(因为它是id
上的join的结果(。