在多列上合并Delta表



我有一个表,它的主键有多个列,所以我需要在多个列上执行合并逻辑


DeltaTable.forPath(spark, "path")
.as("data")
.merge(
finalDf1.as("updates"),
"data.column1 = updates.column1 AND data.column2 = updates.column2 AND data.column3 = updates.column3 AND data.column4 = updates.column4 AND data.column5 = updates.column5")
.whenMatched
.updateAll()
.whenNotMatched
.insertAll()
.execute()

当我检查数据计数时,它没有按预期更新。

有人能帮我一下吗?

请尝试以下示例:https://docs.databricks.com/_static/notebooks/merge-in-cdc.html创建一个包含附加列的更改表,您将注意这些列

  • 如果一行是新的(被插入)
  • 旧的(主键存在),没有任何改变
  • 旧的(主键存在),但其他字段需要更新然后在合并时使用附加条件,例如:
.whenMatched("s.new = true")
.insert()
.whenMatched("s.updated = true")
.updateExpr(Map("key" -> "s.key", "value" -> "s.newValue"))

如何计算行数?

要记住的一点是直接从Delta Lake生成的parquet文件中读取和计数可能会得到与通过Delta表接口读取行不同的结果。请记住,delta保留了一个日志,并支持时间旅行,因此它会存储随时间变化的行副本。

精确计算增量表中当前行的方法:

deltaTable = DeltaTable.forPath(spark,<path to your delta table>)
deltaTable.toDF().count()

相关内容

  • 没有找到相关文章

最新更新