在数据帧中部分更新记录的有效方法



我有一个在快照中积累批处理数据的系统。

批处理中的每条记录都包含一个unique_id和一个版本以及多个其他列。

以前,每当在新批次中,unique_id附带的版本大于快照中存在的版本,syetm 用于替换整个记录并重写为新记录。这通常是基于版本合并两个数据帧。

例如:

Snapshot: <Uid>   <Version> <col1> <col2>
-----------------
A1  | 1     |  ab | cd
A2  | 1     |  ef | gh
New Batch: <Uid>  <Version> <col1> 
------------------
A3  | 1     |  gh
A1  | 2     |  hh

看到这里 col2 在新批次中不存在

After Merge It will become,
<Uid>  <Version> <col1> <col2>
------------------
A3  | 1     |  gh  | Null
A1  | 2     |  hh  | Null
A2  | 1     |  ef  | gh

这里的问题是,即使col2的数据没有出现在UidA2;合并后,该列被替换为空值。因此,列的旧值将丢失。

现在,我只想替换数据所在的列

即预期输出

<Uid>  <Version> <col1> <col2>
------------------
A3  | 1     |  gh  | Null
A1  | 2     |  hh  | cd
A2  | 1     |  ef  | gh

请参阅A1唯一 id col2 值完好无损。

虽然如果批次有 A1 的记录为

New Batch: <Uid>  <Version> <col1> <col2>
------------------
A1  | 2     |  hh  | uu

输出将是 ------------------

A1  | 2     |  hh  | uu
A2  | 1     |  ef  | gh

这里替换了 A2 的整个记录。

根据当前的系统,我正在使用 Spark 并将数据存储为镶木地板。我可以调整Merge流程以合并此更改

但是,我想知道这是否是存储这些用例数据的最佳过程。

我正在评估Hbase和 HiveORC以及我可以对合并过程进行的可能更改。

任何建议将不胜感激。

据我了解,您需要在快照和 journal(delta( 之间使用完全外部连接,然后使用coalesce,例如:

def applyDeduplicatedJournal(snapshot: DataFrame, journal: DataFrame, joinColumnNames: Seq[String]): DataFrame = {
val joinExpr = joinColumnNames
.map(column => snapshot(column) === journal(column))
.reduceLeft(_ && _)
val isThereNoJournalRecord = joinColumnNames
.map(jCol => journal(jCol).isNull)
.reduceLeft(_ && _)
val selectClause = snapshot.columns
.map(col => when(isThereNoJournalRecord, snapshot(col)).otherwise(coalesce(journal(col), snapshot(col))) as col)
snapshot
.join(journal, joinExpr, "full_outer")
.select(selectClause: _*)
}

在这种情况下,您将快照与日志合并,并在日志具有空值时回退到快照值。

希望对您有所帮助!

相关内容

  • 没有找到相关文章

最新更新