给定主键,比较两个数据框的其他列,并以垂直方式输出差异列



我想比较具有相同架构且具有主键列的两个数据帧。

对于每个主键,如果其他列有任何差异(可能是多个列,因此需要使用一些动态方式扫描所有其他列),我想输出两个数据帧的列名和值。

另外,如果一个主键在另一个数据帧中不存在,我想输出结果(因此将使用"完全外部连接")。下面是一些示例:

数据帧 1:

+-----------+------+------+
|primary_key|book  |number|
+-----------+------+------+
|1          |book1 | 1    | 
|2          |book2 | 2    |
|3          |book3 | 3    |
|4          |book4 | 4    |
+-----------+------+------+

数据帧 2:

+-----------+------+------+
|primary_key|book  |number|
+-----------+------+------+
|1          |book1 | 1    | 
|2          |book8 | 8    |
|3          |book3 | 7    |
|5          |book5 | 5    |
+-----------+------+------+

结果将是:

+-----------+------+----------+------------+------------*
|primary_key|diff_column_name | dataframe1 | dataframe2 |
+-----------+------+----------+------------+------------*
|2          |book             | book2      | book8      |
|2          |number           | 2          | 8          |
|3          |number           | 3          | 7          |
|4          |book             | book4      | null       |
|4          |number           | 4          | null       |
|5          |book             | null       | book5      |
|5          |number           | null       | 5          |
+-----------+------+----------+------------+------------*

我知道第一步是在主键上连接两个数据帧:

// joining the two DFs on primary_key
val result = df1.as("l")
.join(df2.as("r"), "primary_key", "fullouter") 

但我不确定如何进行。有人可以给我一些建议吗?谢谢

数据:

val df1 = Seq(
(1, "book1", 1), (2, "book2", 2), (3, "book3", 3), (4, "book4", 4)
).toDF("primary_key", "book", "number")
val df2 = Seq(
(1, "book1", 1), (2, "book8", 8), (3, "book3", 7), (5, "book5", 5)
).toDF("primary_key", "book", "number")

进口

import org.apache.spark.sql.functions._

定义列列表:

val cols = Seq("book", "number")

立即加入:

val joined = df1.as("l").join(df2.as("r"), Seq("primary_key"), "fullouter") 

定义:

val comp = explode(array(cols.map(c => struct(
lit(c).alias("diff_column_name"), 
// Value left
col(s"l.${c}").cast("string").alias("dataframe1"),  
// Value right
col(s"r.${c}").cast("string").alias("dataframe2"),
// Differs
not(col(s"l.${c}") <=> col(s"r.${c}")).alias("diff")
)): _*))

选择并过滤:

joined
.withColumn("comp", comp)
.select($"primary_key", $"comp.*")
// Filter out mismatches and get rid of obsolete diff
.where($"diff").drop("diff")
.orderBy("primary_key").show
// +-----------+----------------+----------+----------+
// |          2|            book|     book2|     book8|
// |          2|          number|         2|         8|
// |          3|          number|         3|         7|
// |          4|            book|     book4|      null|
// |          4|          number|         4|      null|
// |          5|            book|      null|     book5|
// |          5|          number|      null|         5|
// +-----------+----------------+----------+----------+

相关内容

最新更新