我有两个这样的数据框架:
df2.show()
+----+-------+------+
|NAME|BALANCE|SALARY|
+----+-------+------+
|PPan| 11| 500|
|Liza| 20| 900|
+----+-------+------+
df3.show()
+----+-------+------+
|NAME|BALANCE|SALARY|
+----+-------+------+
|PPan| 10| 700|
| Cal| 70| 888|
+----+-------+------+
df2
表示数据库中已有的记录,df3
表示需要插入/更新到db中的新记录/更新的记录(任何列)。例如:NAME=PPan
的新平衡是10
根据df3
。因此,对于NAME=PPan
,整个行必须在df2
中替换,对于NAME=Cal
,必须添加一个新行,对于name=Liza
,将像这样保持不变:
+----+-------+------+
|NAME|BALANCE|SALARY|
+----+-------+------+
|PPan| 10| 700|
|Liza| 20| 900|
| Cal| 70| 888|
+----+-------+------+
我如何实现这个用例?
首先,您需要使用full
方法连接两个数据框以保持不匹配的行(新)并更新匹配的记录,我更喜欢使用select
与coalesce
函数:
joined_df = df2.alias('rec').join(df3.alias('upd'), on='NAME', how='full')
# +----+-------+------+-------+------+
# |NAME|BALANCE|SALARY|BALANCE|SALARY|
# +----+-------+------+-------+------+
# |Cal |null |null |70 |888 |
# |Liza|20 |900 |null |null |
# |PPan|11 |500 |10 |700 |
# +----+-------+------+-------+------+
output_df = joined_df.selectExpr(
'NAME',
'COALESCE(upd.BALANCE, rec.BALANCE) BALANCE',
'COALESCE(upd.SALARY, rec.SALARY) SALARY'
)
output_df.sort('BALANCE').show(truncate=False)
+----+-------+------+
|NAME|BALANCE|SALARY|
+----+-------+------+
|PPan|10 |700 |
|Liza|20 |900 |
|Cal |70 |888 |
+----+-------+------+