如何在Delta表中添加复杂的updateExpr逻辑



我正在更新一个增量记录的增量表。其中两个字段只需要简单更新,但还有一个字段是映射集合,我希望将所有现有值连接起来,而不是执行更新/替换

val historicalDF = Seq(
(1, 0, "Roger", Seq(Map("score" -> 5, "year" -> 2012)))
).toDF("id", "ts", "user", "scores")
historicalDF.write
.format("delta")
.mode("overwrite")
.save(table_path)
val hist_dt : DeltaTable = DeltaTable.forPath(spark, table_path)

val incrementalDF = Seq(
(1, 1, "Roger Rabbit", Seq(Map("score" -> 7, "year" -> 2013)))
).toDF("id", "ts", "user", "scores")   

我想在合并后得到的东西是这样的:

+---+---+------------+--------------------------------------------------------+
|id |ts |user        |scores                                                  |
+---+---+------------+--------------------------------------------------------+
|1  |1  |Roger Rabbit|[{score -> 7, year -> 2013}, {score -> 7, year -> 2013}]|
+---+---+------------+--------------------------------------------------------+

我尝试执行的这个连接是:

hist_dt
.as("ex")
.merge(incrementalDF.as("in"),
"ex.id = in.id")
.whenMatched
.updateExpr(
Map(
"ts" -> "in.ts",
"user" -> "in.user",
"scores" -> "in.scores" ++ "ex.scores"
)
)
.whenNotMatched
.insertAll()
.execute()

但是列"in.scores""ex.scores"被解释为String,所以我得到以下错误:

error: value ++ is not a member of (String, String)

是否有办法在updateExpr中添加一些复杂的逻辑?

使用update()而不是updateExpr(),让我将所需的列传递给udf,因此我可以在那里添加更复杂的逻辑

def join_seq_map(incremental: Seq[Map[String,Integer]], existing: Seq[Map[String,Integer]]) : Seq[Map[String,Integer]] = {
(incremental, existing) match {
case ( null , null) => null
case ( null, e ) => e
case ( i , null) => i
case ( i , e ) => (i ++ e).distinct
} 
}
def join_seq_map_udf = udf(join_seq_map _)
hist_dt
.as("ex")
.merge(
incrementalDF.as("in"),
"ex.id = in.id")
.whenMatched("ex.ts < in.ts")
.update(Map(
"ts" -> col("in.ts"),
"user" -> col("in.user"),
"scores" -> join_seq_map_udf(col("in.scores"), col("ex.scores"))
))
.whenNotMatched
.insertAll()
.execute()

最新更新