火花SQL DataFrame API -build滤波器条件动态条件



我有两个Spark DataFrame,df1df2

+-------+-----+---+
|   name|empNo|age|
+-------+-----+---+
|shankar|12121| 28|
| ramesh| 1212| 29|
| suresh| 1111| 30|
| aarush| 0707| 15|
+-------+-----+---+
+------+-----+---+-----+
| eName|  eNo|age| city|
+------+-----+---+-----+
|aarush|12121| 15|malmo|
|ramesh| 1212| 29|malmo|
+------+-----+---+-----+

我需要基于在另一个文件中指定的许多列中从df1获取非匹配记录

例如,列查找文件如下所示:

df1col,df2col
name,eName
empNo, eNo

预期输出是:

+-------+-----+---+
|   name|empNo|age|
+-------+-----+---+
|shankar|12121| 28|
| suresh| 1111| 30|
| aarush| 0707| 15|
+-------+-----+---+

这个想法是如何在上述情况下动态构建一个条件,因为查找文件是可配置的,因此它可能具有1至n个字段。

您可以使用except DataFrame方法。我假设要使用的列在两个列表中为了简单性。两个列表的顺序都是正确的,将比较列表中同一位置的列(无论列名称如何)。except之后,使用join从第一个数据框中获取丢失的列。

val df1 = Seq(("shankar","12121",28),("ramesh","1212",29),("suresh","1111",30),("aarush","0707",15))
  .toDF("name", "empNo", "age")
val df2 = Seq(("aarush", "12121", 15, "malmo"),("ramesh", "1212", 29, "malmo"))
  .toDF("eName", "eNo", "age", "city")
val df1Cols = List("name", "empNo")
val df2Cols = List("eName", "eNo")
val tempDf = df1.select(df1Cols.head, df1Cols.tail: _*)
  .except(df2.select(df2Cols.head, df2Cols.tail: _*))    
val df = df1.join(broadcast(tempDf), df1Cols)

生成的数据框看起来像是想要的:

+-------+-----+---+
|   name|empNo|age|
+-------+-----+---+
| aarush| 0707| 15|
| suresh| 1111| 30|
|shankar|12121| 28|
+-------+-----+---+

如果您是在SQL查询中执行此操作的,我将使用SQL查询中的列名重新映射,例如通过查询更改SQL列标题之类的内容。您可以在查询中进行简单的文本替换,以将其标准化为DF1或DF2列名。

一旦有了可以使用之类的东西如何获得两个数据帧之间的差异?

如果您需要更多在diff(例如年龄)中不使用的列,则可以根据您的差异结果再次重新选择数据。这可能不是这样做的最佳方法,但可能会起作用。

最新更新