我有两个数据帧A和B。
一个
+---+------+-----+----------+
| id|player|score| date|
+---+------+-----+----------+
| 1| alpha| 5|2018-02-13|
| 2| beta| 6|2018-02-13|
+---+------+-----+----------+
乙
+---+------+-----+----------+
| id|player|score| date|
+---+------+-----+----------+
| 1| alpha| 100|2019-02-13|
| 2| beta| 6|2018-02-13|
+---+------+-----+----------+
我必须创建一个新的数据帧,其中通过查看日期来更新分数
结果
+---+------+-----+----------+
|id |player|score|date |
+---+------+-----+----------+
| 1| alpha| 100|2019-02-13|
| 2| beta| 6|2018-02-13|
+---+------+-----+----------+
可以联接两个数据帧,并使用pyspark.sql.functions.when()
选取score
列和date
列的值。
from pyspark.sql.functions import col, when
df_A.alias("a").join(df_B.alias("b"), on=["id", "player"], how="inner")
.select(
"id",
"player",
when(
col("b.date") > col("a.date"),
col("b.score")
).otherwise(col("a.score")).alias("score"),
when(
col("b.date") > col("a.date"),
col("b.date")
).otherwise(col("a.date")).alias("date")
)
.show()
#+---+------+-----+----------+
#| id|player|score| date|
#+---+------+-----+----------+
#| 1| alpha| 100|2019-02-13|
#| 2| beta| 6|2018-02-13|
#+---+------+-----+----------+
阅读更多关于when
的信息:Spark 相当于 IF 然后 ELSE
我假设每个player
都被分配了一个id
并且它不会改变。OP 希望生成的数据帧应包含来自最新date
的score
。
# Creating both the DataFrames.
df_A = sqlContext.createDataFrame([(1,'alpha',5,'2018-02-13'),(2,'beta',6,'2018-02-13')],('id','player','score','date'))
df_A = df_A.withColumn('date',to_date(col('date'), 'yyyy-MM-dd'))
df_B = sqlContext.createDataFrame([(1,'alpha',100,'2019-02-13'),(2,'beta',6,'2018-02-13')],('id','player','score','date'))
df_B = df_B.withColumn('date',to_date(col('date'), 'yyyy-MM-dd'))
这个想法是创建这两个数据帧的联合(),然后获取distinct
行。之后获取distinct
行的原因如下 - 假设player
没有更新,那么在B
数据帧中,它的对应值将与数据帧A
相同。因此,我们删除了这样的duplicates
。
# Importing the requisite packages.
from pyspark.sql.functions import col, max
from pyspark.sql import Window
df = df_A.union(df_B).distinct()
df.show()
+---+------+-----+----------+
| id|player|score| date|
+---+------+-----+----------+
| 1| alpha| 5|2018-02-13|
| 1| alpha| 100|2019-02-13|
| 2| beta| 6|2018-02-13|
+---+------+-----+----------+
现在,作为最后一步,使用 Window() 函数遍历联合的数据帧df
并找到latestDate
并仅过滤掉date
与latestDate
相同的行。这样,与这些players
对应的所有行都将在有更新的地方被删除(由数据帧B
中的更新日期表示)。
w = Window.partitionBy('id','player')
df = df.withColumn('latestDate', max('date').over(w))
.where(col('date') == col('latestDate')).drop('latestDate')
df.show()
+---+------+-----+----------+
| id|player|score| date|
+---+------+-----+----------+
| 1| alpha| 100|2019-02-13|
| 2| beta| 6|2018-02-13|
+---+------+-----+----------+