我有一个 Spark 作业,它被安排执行。
当我将结果数据帧写入数据目标(S3,HDFS,DB...)时,我希望Spark写入的内容不会针对特定列重复。
例:
假设MY_ID
是唯一列。
第一次执行:
--------------
|MY_ID|MY_VAL|
--------------
| 1 | 5 |
| 2 | 9 |
| 3 | 6 |
--------------
第二次执行:
--------------
|MY_ID|MY_VAL|
--------------
| 2 | 9 |
| 3 | 2 |
| 4 | 4 |
--------------
我希望在 2 次执行后在数据目标中找到的内容是这样的:
--------------
|MY_ID|MY_VAL|
--------------
| 1 | 5 |
| 2 | 9 |
| 3 | 6 |
| 4 | 4 |
--------------
其中,预期输出是第一次执行的结果,并附加第二次执行的结果。如果MY_ID
的值已经存在,则保留旧的值,丢弃新执行的结果(在这种情况下,第二个执行要为MY_ID
3 写入MY_VAL
9。由于此记录在第一次执行中已存在,因此将丢弃新记录)。
所以distinct()
函数不足以保证这种情况。即使在转储的输出中,也应保持列MY_ID
的唯一性。
是否有任何解决方案可以以合理的计算成本保证此属性?(这基本上与关系数据库中UNIQUE
的想法相同。
你可以在第一次和第二次迭代中执行fullOuterJoin
。
val joined = firstIteration.join(secondIteration, Seq("MY_ID"), "fullouter")
scala> joined.show
+-----+------+------+
|MY_ID|MY_VAL|MY_VAL|
+-----+------+------+
| 1| 5| null|
| 3| 6| 2|
| 4| null| 4|
| 2| 9| 9|
+-----+------+------+
从结果表中,如果 firstIteration 的MY_VAL
具有值,则可以按原样使用它。否则,如果其null
(指示密钥仅在第二次迭代中出现)。使用第二迭代的MY_VAL
中的值。
scala> joined.withColumn("result", when(firstIteration.col("MY_VAL").isNull, secondIteration.col("MY_VAL"))
.otherwise(firstIteration.col("MY_VAL")))
.drop("MY_VAL")
.show
+-----+------+
|MY_ID|result|
+-----+------+
| 1| 5|
| 3| 6|
| 4| 4|
| 2| 9|
+-----+------+
不确定您使用的是 Scala 还是 Python,但请查看允许您指定一列或多列的dropDuplicates
函数: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset