在 Spark 中写入唯一值,同时保留旧值



我有一个 Spark 作业,它被安排执行。

当我将结果数据帧写入数据目标(S3,HDFS,DB...)时,我希望Spark写入的内容不会针对特定列重复。

例:

假设MY_ID是唯一列。

第一次执行:

--------------
|MY_ID|MY_VAL|
--------------
|  1  |   5  |
|  2  |   9  |
|  3  |   6  |
--------------

第二次执行:

--------------
|MY_ID|MY_VAL|
--------------
|  2  |   9  |
|  3  |   2  |
|  4  |   4  |
--------------

我希望在 2 次执行后在数据目标中找到的内容是这样的:

--------------
|MY_ID|MY_VAL|
--------------
|  1  |   5  |
|  2  |   9  |
|  3  |   6  |
|  4  |   4  |
--------------

其中,预期输出是第一次执行的结果,并附加第二次执行的结果。如果MY_ID的值已经存在,则保留旧的值,丢弃新执行的结果(在这种情况下,第二个执行要为MY_ID3 写入MY_VAL9。由于此记录在第一次执行中已存在,因此将丢弃新记录)。

所以distinct()函数不足以保证这种情况。即使在转储的输出中,也应保持列MY_ID的唯一性。

是否有任何解决方案可以以合理的计算成本保证此属性?(这基本上与关系数据库中UNIQUE的想法相同。

你可以在第一次和第二次迭代中执行fullOuterJoin

val joined = firstIteration.join(secondIteration, Seq("MY_ID"), "fullouter")
scala> joined.show
+-----+------+------+
|MY_ID|MY_VAL|MY_VAL|
+-----+------+------+
|    1|     5|  null|
|    3|     6|     2|
|    4|  null|     4|
|    2|     9|     9|
+-----+------+------+

从结果表中,如果 firstIteration 的MY_VAL具有值,则可以按原样使用它。否则,如果其null(指示密钥仅在第二次迭代中出现)。使用第二迭代的MY_VAL中的值。

scala> joined.withColumn("result", when(firstIteration.col("MY_VAL").isNull, secondIteration.col("MY_VAL"))
.otherwise(firstIteration.col("MY_VAL")))
.drop("MY_VAL")
.show
+-----+------+
|MY_ID|result|
+-----+------+
|    1|     5|
|    3|     6|
|    4|     4|
|    2|     9|
+-----+------+

不确定您使用的是 Scala 还是 Python,但请查看允许您指定一列或多列的dropDuplicates函数: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset

相关内容

  • 没有找到相关文章

最新更新