我使用DeltaLake API使用下面的代码来更新表中的行
DeltaTable.forPath(sparkSession, cleanDataPath)
.as("target")
.merge(df.as("source"), "target.desk_mirror_name = source.desk_mirror_name AND target.price = source.price AND target.valuationdate = source.valuationdate AND target.valuationversion = source.valuationversion")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute();
这应该匹配源表和目标表之间的所有列,除了列valuationtag
合并前,目标表如下所示
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| Sample|499.97| 2021-06-10| 210611170317|210611170317|
| Sample|967.93| 2021-06-10| 210611170317|210611170317|
| Sample| 500.0| 2021-06-10| 210611170317|210611170317|
+----------------+------+-------------+----------------+------------+
源表(应该更新目标表)如下所示
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| Sample|499.97| 2021-06-10| 210611170317| OFFICIAL|
| Sample| 500.0| 2021-06-10| 210611170317| OFFICIAL|
| Sample|967.93| 2021-06-10| 210611170317| OFFICIAL|
+----------------+------+-------------+----------------+------------+
只有valuationtag
变成了OFFICIAL。这样,更新后的表为
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| Sample|499.97| 2021-06-10| 210611170317| OFFICIAL|
| Sample| 500.0| 2021-06-10| 210611170317| OFFICIAL|
| Sample|967.93| 2021-06-10| 210611170317| OFFICIAL|
+----------------+------+-------------+----------------+------------+
一切顺利。
当列(在两个表中)包含null时,问题就开始了值。假设在目标表
中将desk_mirror_name
列更改为空。+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| null|499.97| 2021-06-10| 210611170317|210611170317|
| null|967.93| 2021-06-10| 210611170317|210611170317|
| null| 500.0| 2021-06-10| 210611170317|210611170317|
+----------------+------+-------------+----------------+------------+
对于具有完全相同数据的源表,除了valuationtag
被更改为OFFICIAL外,奇怪的是,更新的表插入了新行,而不是合并。结果如下
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| null|499.97| 2021-06-10| 210611170317|210611170317|
| null|967.93| 2021-06-10| 210611170317|210611170317|
| null| 500.0| 2021-06-10| 210611170317|210611170317|
| null|967.93| 2021-06-10| 210611170317| OFFICIAL|
| null| 500.0| 2021-06-10| 210611170317| OFFICIAL|
| null|499.97| 2021-06-10| 210611170317| OFFICIAL|
+----------------+------+-------------+----------------+------------+
DeltaLake似乎没有正确处理desk_mirror_name
,它在源表和目标表中都具有空值。
如何处理这样的具体情况?
这是spark的预期行为。Apache Spark支持>, >=, =, < and <=.
等标准比较操作符,当其中一个操作数或两个操作数为未知或NULL
时,这些操作符的结果为未知或NULL
。为了比较NULL值是否相等,Spark提供了一个NULL安全的相等运算符(<=>)
,当其中一个操作数为NULL时返回False,当两个操作数都为NULL时返回True。参考链接。
因此,Null
被认为是WHEN NOT MATCHED
条款,只能具有INSERT
的作用。根据指定的列和相应的表达式生成新行。不需要指定目标表中的所有列。对于未指定的目标列,插入NULL
。
使用null安全相等运算符<=>而不是=应该解决这个问题。我已经在Databricks SQL笔记本上尝试过了,它工作得很好。Spark SQL NULL语义