我有两个spark数据帧,一个有70k行,另一个有60k行。我试图将每个列字符串与其他数据帧列进行比较,并生成具有Remark条件的新数据帧。如果df1中的列字符串存在于df2中,则它将是重复的备注。我尝试过使用外部联接,但它没有给我预期的输出。
df1
colA colB
A d4f488bef2
B c8a91953fc
C 62026fd921
D e88480226d
E 8335195031
df2
ColA ColB
W 411c78854c9
X 0bfeb09d6cf
C 62026fd9211
E 8335195031e
**Expected output : df3**
ColA ColB Remark
A d4f488bef2d2 From df1
B c8a91953fc52 From df1
D e88480226d3b From df1
W 411c78854c9c From df2
X 0bfeb09d6cfb From df2
C 62026fd921 duplicate(In Both)
E 8335195031 duplicate(In Both)
val df1Remark = df.withColumn("df1_remark",lit("DF1"))
val df2Remark = df.withColumn("df2_remark",lit("DF2"))
val res = df1Remark.join(df1Remark,Seq("colA","colB"),"outer")
.withColumn("remark",when(col("df1_remark").isNotNull && col("df2_remark").isNotNull,lit("Duplicate")).otherwise(when(col("df1_remark").isNotNull,lit("From DF1")).otherwise(lit("FROM DF2"))))
这可以通过火花扩展和三条简单的线来完成:
import uk.co.gresearch.spark.diff._
val opts = DiffOptions("Remarks", "df1", "df2", "From df2", "changed", "From df1", "duplicate(In Both)")
df1.diff(df2, opts).show
+------------------+----+-----------+
| Remarks|colA| colB|
+------------------+----+-----------+
| From df2| X|0bfeb09d6cf|
| From df1| B| c8a91953fc|
|duplicate(In Both)| E|8335195031e|
| From df1| A| d4f488bef2|
| From df1| D| e88480226d|
|duplicate(In Both)| C|62026fd9211|
| From df2| W|411c78854c9|
+------------------+----+-----------+
diff
转换计算df1
和df2
之间的差,opts: DiffOptions
以您在问题中定义的格式格式化输出。
默认输出格式如下:
df1.diff(df2).show
+----+----+-----------+
|diff|colA| colB|
+----+----+-----------+
| I| X|0bfeb09d6cf|
| D| B| c8a91953fc|
| N| E|8335195031e|
| D| A| d4f488bef2|
| D| D| e88480226d|
| N| C|62026fd9211|
| I| W|411c78854c9|
+----+----+-----------+