将两个spark数据帧进行比较,生成带有备注的新的第三个数据帧



我有两个spark数据帧,一个有70k行,另一个有60k行。我试图将每个列字符串与其他数据帧列进行比较,并生成具有Remark条件的新数据帧。如果df1中的列字符串存在于df2中,则它将是重复的备注。我尝试过使用外部联接,但它没有给我预期的输出。

df1
colA    colB
A       d4f488bef2
B       c8a91953fc
C       62026fd921
D       e88480226d
E       8335195031
df2
ColA    ColB
W       411c78854c9
X       0bfeb09d6cf
C       62026fd9211
E       8335195031e
**Expected output : df3**
ColA    ColB            Remark
A       d4f488bef2d2    From df1
B       c8a91953fc52    From df1
D       e88480226d3b    From df1
W       411c78854c9c    From df2
X       0bfeb09d6cfb    From df2
C       62026fd921      duplicate(In Both)
E       8335195031      duplicate(In Both)
val df1Remark  = df.withColumn("df1_remark",lit("DF1"))
val df2Remark  = df.withColumn("df2_remark",lit("DF2"))
val res =  df1Remark.join(df1Remark,Seq("colA","colB"),"outer")
.withColumn("remark",when(col("df1_remark").isNotNull && col("df2_remark").isNotNull,lit("Duplicate")).otherwise(when(col("df1_remark").isNotNull,lit("From DF1")).otherwise(lit("FROM DF2"))))

这可以通过火花扩展和三条简单的线来完成:

import uk.co.gresearch.spark.diff._
val opts = DiffOptions("Remarks", "df1", "df2", "From df2", "changed", "From df1", "duplicate(In Both)")
df1.diff(df2, opts).show
+------------------+----+-----------+
|           Remarks|colA|       colB|
+------------------+----+-----------+
|          From df2|   X|0bfeb09d6cf|
|          From df1|   B| c8a91953fc|
|duplicate(In Both)|   E|8335195031e|
|          From df1|   A| d4f488bef2|
|          From df1|   D| e88480226d|
|duplicate(In Both)|   C|62026fd9211|
|          From df2|   W|411c78854c9|
+------------------+----+-----------+

diff转换计算df1df2之间的差,opts: DiffOptions以您在问题中定义的格式格式化输出。

默认输出格式如下:

df1.diff(df2).show
+----+----+-----------+
|diff|colA|       colB|
+----+----+-----------+
|   I|   X|0bfeb09d6cf|
|   D|   B| c8a91953fc|
|   N|   E|8335195031e|
|   D|   A| d4f488bef2|
|   D|   D| e88480226d|
|   N|   C|62026fd9211|
|   I|   W|411c78854c9|
+----+----+-----------+

最新更新