Pyspark窗口返回描述col.



我有一个2列的数据框:process nameprocess rank。我想在数据框中添加2列,使用窗口来查找最小和最大排名,并在每行上显示它们。

请参阅示例列'Max Rank Process(输出我想使用窗口)''Min Rank Process(输出我想使用窗口2)'了解我实际想要输出的内容。如果没有某种聚合,窗口似乎不支持"列名"。如果没有窗口(或有窗口),我怎么能做到这一点?

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
schema = StructType([ 
StructField("Process",StringType(),True), 
StructField("Process_rank",IntegerType(),True), 
StructField("Max Rank Process (output I want using windowing)",StringType(),True) , 
StructField("Min Rank Process (output I want using windowing 2)",StringType(),True)
])
data = [("Inventory", 1, "Retire","Inventory"), 
("Data availability", 2, "Retire", "Inventory"), 
("Code Conversion", 3, "Retire", "Inventory"), 
("Retire", 4, "Retire", "Inventory")
]
df = spark.createDataFrame(data=data,schema=schema)
############Partitions
# window1: partition by Process name, order by rank max
w_max_rnk = Window.partitionBy("Process").orderBy(F.col("Process_rank").desc()) 
# window2: partition by Process name, order by rank min
w_max_rnk = Window.partitionBy("Process").orderBy(F.col("Process_rank").asc()) 
#windowed cols to find max and min processes from dataframe
df = df.withColumn("max_ranked_process", F.col("Process").over(w_max_rnk)) 
.withColumn("min_ranked_process", F.col("Process").over(w_max_rnk))

性能不是很好,它只适用于较小的数据帧,尽管这应该会给出正确的结果。

w = Window.orderBy('Process_rank').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df = (df.withColumn('Max Rank Process', F.last('Process').over(w))
.withColumn('Min Rank Process', F.first('Process').over(w)))

最新更新