我有两个Spark DataFrame,df1
和df2
:
+-------+-----+---+
| name|empNo|age|
+-------+-----+---+
|shankar|12121| 28|
| ramesh| 1212| 29|
| suresh| 1111| 30|
| aarush| 0707| 15|
+-------+-----+---+
+------+-----+---+-----+
| eName| eNo|age| city|
+------+-----+---+-----+
|aarush|12121| 15|malmo|
|ramesh| 1212| 29|malmo|
+------+-----+---+-----+
我需要基于在另一个文件中指定的许多列中从df1
获取非匹配记录。
例如,列查找文件如下所示:
df1col,df2col
name,eName
empNo, eNo
预期输出是:
+-------+-----+---+
| name|empNo|age|
+-------+-----+---+
|shankar|12121| 28|
| suresh| 1111| 30|
| aarush| 0707| 15|
+-------+-----+---+
这个想法是如何在上述情况下动态构建一个条件,因为查找文件是可配置的,因此它可能具有1至n个字段。
您可以使用except
DataFrame方法。我假设要使用的列在两个列表中为了简单性。两个列表的顺序都是正确的,将比较列表中同一位置的列(无论列名称如何)。except
之后,使用join
从第一个数据框中获取丢失的列。
val df1 = Seq(("shankar","12121",28),("ramesh","1212",29),("suresh","1111",30),("aarush","0707",15))
.toDF("name", "empNo", "age")
val df2 = Seq(("aarush", "12121", 15, "malmo"),("ramesh", "1212", 29, "malmo"))
.toDF("eName", "eNo", "age", "city")
val df1Cols = List("name", "empNo")
val df2Cols = List("eName", "eNo")
val tempDf = df1.select(df1Cols.head, df1Cols.tail: _*)
.except(df2.select(df2Cols.head, df2Cols.tail: _*))
val df = df1.join(broadcast(tempDf), df1Cols)
生成的数据框看起来像是想要的:
+-------+-----+---+
| name|empNo|age|
+-------+-----+---+
| aarush| 0707| 15|
| suresh| 1111| 30|
|shankar|12121| 28|
+-------+-----+---+
如果您是在SQL查询中执行此操作的,我将使用SQL查询中的列名重新映射,例如通过查询更改SQL列标题之类的内容。您可以在查询中进行简单的文本替换,以将其标准化为DF1或DF2列名。
一旦有了可以使用之类的东西如何获得两个数据帧之间的差异?
如果您需要更多在diff(例如年龄)中不使用的列,则可以根据您的差异结果再次重新选择数据。这可能不是这样做的最佳方法,但可能会起作用。