我有一个在快照中积累批处理数据的系统。
批处理中的每条记录都包含一个unique_id和一个版本以及多个其他列。
以前,每当在新批次中,unique_id附带的版本大于快照中存在的版本,syetm 用于替换整个记录并重写为新记录。这通常是基于版本合并两个数据帧。
例如:
Snapshot: <Uid> <Version> <col1> <col2>
-----------------
A1 | 1 | ab | cd
A2 | 1 | ef | gh
New Batch: <Uid> <Version> <col1>
------------------
A3 | 1 | gh
A1 | 2 | hh
看到这里 col2 在新批次中不存在
After Merge It will become,
<Uid> <Version> <col1> <col2>
------------------
A3 | 1 | gh | Null
A1 | 2 | hh | Null
A2 | 1 | ef | gh
这里的问题是,即使col2
的数据没有出现在Uid
A2;合并后,该列被替换为空值。因此,列的旧值将丢失。
现在,我只想替换数据所在的列
即预期输出
<Uid> <Version> <col1> <col2>
------------------
A3 | 1 | gh | Null
A1 | 2 | hh | cd
A2 | 1 | ef | gh
请参阅
A1
唯一 id col2 值完好无损。
虽然如果批次有 A1 的记录为
New Batch: <Uid> <Version> <col1> <col2>
------------------
A1 | 2 | hh | uu
输出将是 ------------------
A1 | 2 | hh | uu
A2 | 1 | ef | gh
这里替换了 A2 的整个记录。
根据当前的系统,我正在使用 Spark 并将数据存储为镶木地板。我可以调整Merge
流程以合并此更改
但是,我想知道这是否是存储这些用例数据的最佳过程。
我正在评估Hbase
和 HiveORC
以及我可以对合并过程进行的可能更改。
任何建议将不胜感激。
据我了解,您需要在快照和 journal(delta( 之间使用完全外部连接,然后使用coalesce
,例如:
def applyDeduplicatedJournal(snapshot: DataFrame, journal: DataFrame, joinColumnNames: Seq[String]): DataFrame = {
val joinExpr = joinColumnNames
.map(column => snapshot(column) === journal(column))
.reduceLeft(_ && _)
val isThereNoJournalRecord = joinColumnNames
.map(jCol => journal(jCol).isNull)
.reduceLeft(_ && _)
val selectClause = snapshot.columns
.map(col => when(isThereNoJournalRecord, snapshot(col)).otherwise(coalesce(journal(col), snapshot(col))) as col)
snapshot
.join(journal, joinExpr, "full_outer")
.select(selectClause: _*)
}
在这种情况下,您将快照与日志合并,并在日志具有空值时回退到快照值。
希望对您有所帮助!