DeltaLake合并具有空值的列



我使用DeltaLake API使用下面的代码来更新表中的行

DeltaTable.forPath(sparkSession, cleanDataPath)
.as("target")
.merge(df.as("source"), "target.desk_mirror_name = source.desk_mirror_name AND target.price = source.price AND target.valuationdate = source.valuationdate AND target.valuationversion = source.valuationversion")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute();

这应该匹配源表和目标表之间的所有列,除了列valuationtag

合并前,目标表如下所示

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|          Sample|499.97|   2021-06-10|    210611170317|210611170317|
|          Sample|967.93|   2021-06-10|    210611170317|210611170317|
|          Sample| 500.0|   2021-06-10|    210611170317|210611170317|
+----------------+------+-------------+----------------+------------+

源表(应该更新目标表)如下所示

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|          Sample|499.97|   2021-06-10|    210611170317|    OFFICIAL|
|          Sample| 500.0|   2021-06-10|    210611170317|    OFFICIAL|
|          Sample|967.93|   2021-06-10|    210611170317|    OFFICIAL|
+----------------+------+-------------+----------------+------------+

只有valuationtag变成了OFFICIAL。这样,更新后的表为

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|          Sample|499.97|   2021-06-10|    210611170317|    OFFICIAL|
|          Sample| 500.0|   2021-06-10|    210611170317|    OFFICIAL|
|          Sample|967.93|   2021-06-10|    210611170317|    OFFICIAL|
+----------------+------+-------------+----------------+------------+

一切顺利。

当列(在两个表中)包含null时,问题就开始了值。假设在目标表

中将desk_mirror_name列更改为空。
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|            null|499.97|   2021-06-10|    210611170317|210611170317|
|            null|967.93|   2021-06-10|    210611170317|210611170317|
|            null| 500.0|   2021-06-10|    210611170317|210611170317|
+----------------+------+-------------+----------------+------------+

对于具有完全相同数据的源表,除了valuationtag被更改为OFFICIAL外,奇怪的是,更新的表插入了新行,而不是合并。结果如下

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|            null|499.97|   2021-06-10|    210611170317|210611170317|
|            null|967.93|   2021-06-10|    210611170317|210611170317|
|            null| 500.0|   2021-06-10|    210611170317|210611170317|
|            null|967.93|   2021-06-10|    210611170317|    OFFICIAL|
|            null| 500.0|   2021-06-10|    210611170317|    OFFICIAL|
|            null|499.97|   2021-06-10|    210611170317|    OFFICIAL|
+----------------+------+-------------+----------------+------------+

DeltaLake似乎没有正确处理desk_mirror_name,它在源表和目标表中都具有空值。

如何处理这样的具体情况?

这是spark的预期行为。Apache Spark支持>, >=, =, < and <=.等标准比较操作符,当其中一个操作数或两个操作数为未知或NULL时,这些操作符的结果为未知或NULL。为了比较NULL值是否相等,Spark提供了一个NULL安全的相等运算符(<=>),当其中一个操作数为NULL时返回False,当两个操作数都为NULL时返回True。参考链接。

因此,Null被认为是WHEN NOT MATCHED条款,只能具有INSERT的作用。根据指定的列和相应的表达式生成新行。不需要指定目标表中的所有列。对于未指定的目标列,插入NULL

使用null安全相等运算符<=>而不是=应该解决这个问题。我已经在Databricks SQL笔记本上尝试过了,它工作得很好。Spark SQL NULL语义

相关内容

  • 没有找到相关文章

最新更新