我有一个像这样的数据框架:
name field1 field2 field3
a 4 10 8
b 5 0 11
c 10 7 4
d 0 1 5
我需要为每个字段找到前3个名称。
预期输出:
top3-field1 top3-field2 top3-field3
c a b
b c a
a d d
因此,我尝试排序字段(n)列值,限制前3个结果并使用withColumn
方法生成新列,如下所示:
df1 = df.orderBy(f.col("field1").desc(), "name")
.limit(3)
.withColumn("top3-field1", df["name"])
.select("top3-field1", "field1")
使用这种方法,我必须为每个字段(n)创建不同的数据框架,然后将它们连接起来以获得如上所述的结果。我觉得这个问题一定有更好的解决办法。希望有人能给我一些建议
你可以先堆叠df,然后得到降序排序,然后过滤掉小于或等于3的排序,最后枢轴名称:
请注意,我在代码中使用这个函数是为了使输入本身的堆叠更容易一些:
from pyspark.sql import functions as F, Window as W #imports
w = W.partitionBy("col").orderBy(F.desc("values"))
out = (df.selectExpr("name",stack_multiple_col(df,df.columns[1:]))
.withColumn("Rnk",F.dense_rank().over(w))
.where("Rnk<=3").groupBy("Rnk").pivot("col").agg(F.first("name")))
out.show()
+---+------+------+------+
|Rnk|field1|field2|field3|
+---+------+------+------+
| 1| c| a| b|
| 2| b| c| a|
| 3| a| d| d|
+---+------+------+------+
如果你不愿意使用这个函数,你可以这样写:
w = W.partitionBy("col").orderBy(F.desc("values"))
out = (df.selectExpr("name",
'stack(3,"field1",field1,"field2",field2,"field3",field3) as (col,values)')
.withColumn("Rnk",F.dense_rank().over(w))
.where("Rnk<=3").groupBy("Rnk").pivot("col").agg(F.first("name")))
完整代码:
def stack_multiple_col(df,cols=df.columns,output_columns=["col","values"]):
"""stacks multiple columns in a dataframe,
takes all columns by default unless passed a list of values"""
return (f"""stack({len(cols)},{','.join(map(','.join,
(zip([f'"{i}"' for i in cols],cols))))}) as ({','.join(output_columns)})""")
w = W.partitionBy("col").orderBy(F.desc("values"))
out = (df.selectExpr("name",stack_multiple_col(df,df.columns[1:]))
.withColumn("Rnk",F.dense_rank().over(w))
.where("Rnk<=3").groupBy("Rnk").pivot("col").agg(F.first("name")))
out.show()