在连接两个数据帧之后,根据主键从一个数据帧中拾取所有列



我有两个数据帧,我需要根据pyspark中df2中的新更新来更新df1中的记录。

DF1:

df1=spark.createDataFrame([(1,2),(2,3),(3,4)],["id","val1"])
+---+----+
| id|val1|
+---+----+
|  1|   2|
|  2|   3|
|  3|   4|
+---+----+

DF2:

df2=spark.createDataFrame([(1,4),(2,5)],["id","val1"])
+---+----+
| id|val1|
+---+----+
|  1|   4|
|  2|   5|
+---+----+

然后,我尝试连接这两个数据帧。

join_con=(df1["id"] == df2["id"])
jdf=df1.join(df2,join_con,"left")
+---+----+----+----+
| id|val1|  id|val1|
+---+----+----+----+
|  1|   2|   1|   4|
|  3|   4|null|null|
|  2|   3|   2|   5|
+---+----+----+----+

现在,如果df2["id"]不为null,我想从df2中选取所有列,否则选取df1的所有列。

类似于:

jdf.filter(df2.id is null).select(df1["*"])
union
jdf.filter(df2.id is not null).select(df2["*"])

因此得到的DF可以是:

+---+----+
| id|val1|
+---+----+
|  1|   4|
|  2|   5|
|  3|   4|
+---+----+

有人能帮忙吗?

您的选择表达式可以是df2中的列与df1之间的coalesce

from pyspark.sql import functions as F
df1=spark.createDataFrame([(1,2),(2,3),(3,4), (4, 1),],["id","val1"])
df2=spark.createDataFrame([(1,4),(2,5), (4, None),],["id","val1"])
selection_expr = [F.when(df2["id"].isNotNull(), df2[c]).otherwise(df1[c]).alias(c) for c in df2.columns]
jdf.select(selection_expr).show()
"""
+---+----+
| id|val1|
+---+----+
|  1|   4|
|  2|   5|
|  3|   4|
|  4|null|
+---+----+
"""

尝试使用coalesce函数,因为该函数会获得第一个非null值。

expr=zip(df2.columns,df1.columns) 
e1=[coalesce(df2[f[0]],df1[f[1]]).alias(f[0]) for f in expr]
jdf.select(*e1).show()
#+---+----+
#| id|val1|
#+---+----+
#|  1|   4|
#|  2|   5|
#|  3|   4|
#+---+----+

最新更新