如何在pyspark中通过比较两个数据框来获得更新或新的记录



我有两个这样的数据框架:

df2.show()
+----+-------+------+
|NAME|BALANCE|SALARY|
+----+-------+------+
|PPan|     11|   500|
|Liza|     20|   900|
+----+-------+------+
df3.show()
+----+-------+------+
|NAME|BALANCE|SALARY|
+----+-------+------+
|PPan|     10|   700|
| Cal|     70|   888|
+----+-------+------+

df2表示数据库中已有的记录,df3表示需要插入/更新到db中的新记录/更新的记录(任何列)。例如:NAME=PPan的新平衡是10根据df3。因此,对于NAME=PPan,整个行必须在df2中替换,对于NAME=Cal,必须添加一个新行,对于name=Liza,将像这样保持不变:

+----+-------+------+
|NAME|BALANCE|SALARY|
+----+-------+------+
|PPan|     10|   700|
|Liza|     20|   900|
| Cal|     70|   888|
+----+-------+------+

我如何实现这个用例?

首先,您需要使用full方法连接两个数据框以保持不匹配的行(新)并更新匹配的记录,我更喜欢使用selectcoalesce函数:

joined_df = df2.alias('rec').join(df3.alias('upd'), on='NAME', how='full')
# +----+-------+------+-------+------+
# |NAME|BALANCE|SALARY|BALANCE|SALARY|
# +----+-------+------+-------+------+
# |Cal |null   |null  |70     |888   |
# |Liza|20     |900   |null   |null  |
# |PPan|11     |500   |10     |700   |
# +----+-------+------+-------+------+
output_df = joined_df.selectExpr(
'NAME', 
'COALESCE(upd.BALANCE, rec.BALANCE) BALANCE', 
'COALESCE(upd.SALARY, rec.SALARY) SALARY'
)
output_df.sort('BALANCE').show(truncate=False)
+----+-------+------+
|NAME|BALANCE|SALARY|
+----+-------+------+
|PPan|10     |700   |
|Liza|20     |900   |
|Cal |70     |888   |
+----+-------+------+

最新更新