为Databricks delta合并设置标志I或U



当我执行delta合并逻辑时,有没有一种方法可以设置标志列(I-inserted,U-updated(。我很想知道在日常的delta合并逻辑中插入了多少记录,更新了多少记录。

我的示例数据帧:

df_latest = spark.createDataFrame(
[
('Java', "20000"),  # create your data here, be consistent in the types.
('Scala', '90000'),
('Python', '100000')
],
["language", "users_count"]  # add your column names here
)

当我执行下面的delta合并逻辑时,我将需要一个名为flag(i或U(的多列,在delta表的版本02上描述插入了多少行以及如何更新行。

test_delta.alias("h")
.merge(df_latest.alias("df"), "h.language = df.language")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()

任何帮助都将不胜感激,我自己无法解决。。!!

我在寻找类似的技术,却偶然发现了你的问题。幸运的是,我能够使用下面描述的方法来解决它:

1-(创建一个增量表并定义它的模式。

from delta import *
DeltaTable.createIfNotExists(spark) 
.addColumn("language",StringType())
.addColumn("users_count",IntegerType())
.addColumn("Flag",StringType())
.property("description", "Testing Flag Logic") 
.location("/mnt/output/TestingFlagLogic") 
.execute()

可以在下面的链接上看到该表的快照DataFrame 的快照

2-(使用以下命令插入表格。

from delta.tables import *
deltaTable = DeltaTable.forPath(spark,"/mnt/output/TestingFlagLogic")
deltaTable.alias("Destination")
.merge(
df.alias("Updates"),
"Destination.language = Updates.language")
.whenMatchedUpdate(set =
{
"language": "Updates.language",
"users_count": "Updates.users_count",
"Flag": F.lit("U")
}) 
.whenNotMatchedInsert( values =
{
"language": "Updates.language",
"users_count": "Updates.users_count",
"Flag": F.lit("I")
}) 
.execute()

3-(在第一次插入之后,你会得到下面的DataFrame,它会有一个用值"填充的标志列;我";。第一次插入后的增量表

4-(您定义了一个新的DataFrame,其中包含您想要更新的值。在这里,我将语言的用户计数加倍;Python";以及";C++";。

df_updated = spark.createDataFrame(
[
('Python', '200000'),
('C++', '300000'),
],
["language", "users_count"]  # add your column names here
)

具有更新值的数据帧快照

5-(现在使用步骤2中描述的相同逻辑插入。只需使用df_updated更改df即可。

from delta.tables import *
deltaTable = DeltaTable.forPath(spark,"/mnt/output/TestingFlagLogic")
deltaTable.alias("Destination")
.merge(
df_updated.alias("Updates"),
"Destination.language = Updates.language")
.whenMatchedUpdate(set =
{
"language": "Updates.language",
"users_count": "Updates.users_count",
"Flag": F.lit("U")
}) 
.whenNotMatchedInsert( values =
{
"language": "Updates.language",
"users_count": "Updates.users_count",
"Flag": F.lit("I")
}) 
.execute()

6-(祝贺您成功实现了上述功能。现在查询您的delta并显示它以进行视觉验证。

df_new = spark.read.format("delta").load("/mnt/output/TestingFlagLogic")
display(df_new)

更新后的delta表的快照可以在下面的链接上看到。更新后的表快照

你可以看到";Python";以及";C++";具有一个更新的用户计数值;标志";值为";U〃;因为这是一个Upsert(更新+插入(操作。

如果您只需要度量,那么您可以从表的历史记录中检索该信息(通过DESCRIBE history SQL命令或通过历史记录函数(。它们都返回包含操作度量的数据帧(作为映射的operationMetrics列(,对于MERGE操作,有一些度量描述了插入/更新/删除的行数:numTargetRowsInsertednumTargetRowsUpdatednumTargetRowsDeleted

类似这样的东西(仅适用于上一个版本-如果您需要所有版本,那么只需从历史调用中删除1(:

from delta.tables import *
deltaTable = DeltaTable.forName(spark, "mytable")
df = deltaTable.history(1)
f.select(df["operationMetrics"]["numTargetRowsInserted"],
df["operationMetrics"]["numTargetRowsUpdated"],
df["operationMetrics"]["numTargetRowsDeleted"])

相关内容

最新更新