我已经用多个数据帧完成了这项工作,但我希望代码应该在一个代码中
"""
product year price
a 2017 10
b 2019 26
a 2018 35
b 2020 26
a 2020 20
c 2016 12
**output**
product privious_price current_price
a 35 20
b 26 26
c 0 12
"""
# codes are
part = Window.partitionBy("product").orderBy(col("year").desc())
df1 = df.withColumn("rank",rank().over(part)).select('product','year','price','rank').where('rank == 2').
withColumnRenamed('price','privious_price')
part = Window.partitionBy("product").orderBy(col("year").desc())
df2 = df.withColumn("rank",rank().over(part)).select('product','year','price','rank').where("rank == 1").
withColumnRenamed('price','current_price')
df1.join(df2,on = ['product'],how = 'outer').drop('year','rank').sort('product').fillna(value=0)
如何在不使用联接的情况下计算privious_price
和current_price
。
您可以使用lead
函数访问下一行中列的值。
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [("a", 2017, 10,),
("b", 2019, 26,),
("a", 2018, 35,),
("b", 2020, 26,),
("a", 2020, 20,),
("c", 2016, 12,), ]
df = spark.createDataFrame(data, ("product", "year", "price",))
part = Window.partitionBy("product").orderBy(F.col("year").desc())
(df.withColumn("privious_price", F.lead("price", 1, 0).over(part))
.withColumn("rank", F.rank().over(part))
.where("rank = 1")
.select("product", "privious_price", F.col("price").alias("current_price"))
).show()
"""
+-------+--------------+-------------+
|product|privious_price|current_price|
+-------+--------------+-------------+
| a| 35| 20|
| b| 26| 26|
| c| 0| 12|
+-------+--------------+-------------+
"""