如何在没有Join的情况下获得预期输出

  • 本文关键字:输出 情况下 Join pyspark
  • 更新时间 :
  • 英文 :


我已经用多个数据帧完成了这项工作,但我希望代码应该在一个代码中

"""
product year    price
a     2017    10
b     2019    26
a     2018    35
b     2020    26
a     2020    20
c     2016    12
**output**
product privious_price  current_price
a               35        20
b               26        26
c                0        12
"""
# codes are
part  = Window.partitionBy("product").orderBy(col("year").desc())
df1 = df.withColumn("rank",rank().over(part)).select('product','year','price','rank').where('rank == 2'). 
withColumnRenamed('price','privious_price')
part  = Window.partitionBy("product").orderBy(col("year").desc())
df2 = df.withColumn("rank",rank().over(part)).select('product','year','price','rank').where("rank == 1"). 
withColumnRenamed('price','current_price')
df1.join(df2,on = ['product'],how = 'outer').drop('year','rank').sort('product').fillna(value=0)

如何在不使用联接的情况下计算privious_pricecurrent_price

您可以使用lead函数访问下一行中列的值。

from pyspark.sql import functions as F
from pyspark.sql import Window
data = [("a", 2017, 10,),
("b", 2019, 26,),
("a", 2018, 35,),
("b", 2020, 26,),
("a", 2020, 20,),
("c", 2016, 12,), ]
df = spark.createDataFrame(data, ("product", "year", "price",))
part  = Window.partitionBy("product").orderBy(F.col("year").desc())
(df.withColumn("privious_price", F.lead("price", 1, 0).over(part))
.withColumn("rank", F.rank().over(part))
.where("rank = 1")
.select("product", "privious_price", F.col("price").alias("current_price"))
).show()
"""
+-------+--------------+-------------+
|product|privious_price|current_price|
+-------+--------------+-------------+
|      a|            35|           20|
|      b|            26|           26|
|      c|             0|           12|
+-------+--------------+-------------+
"""

相关内容

最新更新