pyspark 加入两个数据帧,并在最近日期之前保持行



我有两个数据帧A和B。

一个

+---+------+-----+----------+
| id|player|score|      date|
+---+------+-----+----------+
|  1| alpha|    5|2018-02-13|
|  2|  beta|    6|2018-02-13|
+---+------+-----+----------+

+---+------+-----+----------+
| id|player|score|      date|
+---+------+-----+----------+
|  1| alpha|  100|2019-02-13|
|  2|  beta|    6|2018-02-13|
+---+------+-----+----------+

我必须创建一个新的数据帧,其中通过查看日期来更新分数

结果

+---+------+-----+----------+
|id |player|score|date      |
+---+------+-----+----------+
|  1| alpha|  100|2019-02-13|
|  2|  beta|    6|2018-02-13|
+---+------+-----+----------+

可以联接两个数据帧,并使用pyspark.sql.functions.when()选取score列和date列的值。

from pyspark.sql.functions import col, when
df_A.alias("a").join(df_B.alias("b"), on=["id", "player"], how="inner")
.select(
"id", 
"player", 
when(
col("b.date") > col("a.date"), 
col("b.score")
).otherwise(col("a.score")).alias("score"),
when(
col("b.date") > col("a.date"), 
col("b.date")
).otherwise(col("a.date")).alias("date")
)
.show()
#+---+------+-----+----------+
#| id|player|score|      date|
#+---+------+-----+----------+
#|  1| alpha|  100|2019-02-13|
#|  2|  beta|    6|2018-02-13|
#+---+------+-----+----------+

阅读更多关于when的信息:Spark 相当于 IF 然后 ELSE

我假设每个player都被分配了一个id并且它不会改变。OP 希望生成的数据帧应包含来自最新datescore

# Creating both the DataFrames.
df_A = sqlContext.createDataFrame([(1,'alpha',5,'2018-02-13'),(2,'beta',6,'2018-02-13')],('id','player','score','date'))
df_A = df_A.withColumn('date',to_date(col('date'), 'yyyy-MM-dd'))
df_B = sqlContext.createDataFrame([(1,'alpha',100,'2019-02-13'),(2,'beta',6,'2018-02-13')],('id','player','score','date'))
df_B = df_B.withColumn('date',to_date(col('date'), 'yyyy-MM-dd'))

这个想法是创建这两个数据帧的联合(),然后获取distinct行。之后获取distinct行的原因如下 - 假设player没有更新,那么在B数据帧中,它的对应值将与数据帧A相同。因此,我们删除了这样的duplicates

# Importing the requisite packages.
from pyspark.sql.functions import col, max
from pyspark.sql import Window
df = df_A.union(df_B).distinct()
df.show()
+---+------+-----+----------+
| id|player|score|      date|
+---+------+-----+----------+
|  1| alpha|    5|2018-02-13|
|  1| alpha|  100|2019-02-13|
|  2|  beta|    6|2018-02-13|
+---+------+-----+----------+

现在,作为最后一步,使用 Window() 函数遍历联合的数据帧df并找到latestDate并仅过滤掉datelatestDate相同的行。这样,与这些players对应的所有行都将在有更新的地方被删除(由数据帧B中的更新日期表示)。

w = Window.partitionBy('id','player')
df = df.withColumn('latestDate', max('date').over(w))
.where(col('date') == col('latestDate')).drop('latestDate')
df.show()
+---+------+-----+----------+
| id|player|score|      date|
+---+------+-----+----------+
|  1| alpha|  100|2019-02-13|
|  2|  beta|    6|2018-02-13|
+---+------+-----+----------+

相关内容

  • 没有找到相关文章

最新更新