Pyspark窗口函数:统计分类变量个数,计算百分比



我有如下格式的数据框架。每个产品都有不同的id,以及关联的产品名称和类型。

ID  Prod Name   Type    Total Qty
1   ABC             A   200
1   DEF             B   350
1   GEH             B   120
1   JIK             C   100
1   LMO             A   40
2   ABC             A   10
2   DEF             A   20
2   GEH             C   30
2   JIK             C   40
2   LMO             A   50

因此,我试图在单独的列中获得该产品名称和ID的总A, B和C的百分比。作为第一步,我尝试使用窗口函数,但它给了我计数"整个列

df.withColumn("count_cat", F.count("Type").over(Window.partitionBy("Type")))

但是我需要这样的东西

ID  total Products  Total Qty   % of A  % of B  % of C
1   5                 810        0.29    0.58    0.12

方法一:按聚合分组

根据您的预期输出,基于GROUP BY Id的聚合就足够了。

假设初始数据集存储在数据帧input_df

中,您可以使用以下方法实现此目标使用spark sql
  1. 通过创建临时视图
  2. 确保您的数据框架是可访问的
input_df.createOrReplaceTempView("input_df")
  1. 在spark会话上运行下面的sql
output_df = sparkSession.sql("""
SELECT
ID,
COUNT(Prod_Name) as `total products`,
SUM(Total_Qty) as `Total Qty`,
SUM(
CASE WHEN Type='A' THEN Total_Qty END
) / SUM(Total_Qty) as `% of A`,
SUM(
CASE WHEN Type='B' THEN Total_Qty END
) / SUM(Total_Qty) as `% of B`,
SUM(
CASE WHEN Type='C' THEN Total_Qty END
) / SUM(Total_Qty) as `% of C`
FROM
input_df
GROUP BY
ID
""").na.fill(0)
使用pyspark API
from pyspark.sql import functions as F
output_df = (
input_df.groupBy("ID")
.agg(
F.count("Prod_Name").alias("total products"),
F.sum("Total_Qty").alias("Total Qty"),
(F.sum(
F.when(
F.col("Type")=="A",F.col("Total_Qty")
).otherwise(0)
) / F.sum("Total_Qty")).alias("% of A"),
(F.sum(
F.when(
F.col("Type")=="B",F.col("Total_Qty")
).otherwise(0)
)/ F.sum("Total_Qty")).alias("% of B"),
(F.sum(
F.when(
F.col("Type")=="C",F.col("Total_Qty")
).otherwise(0)
)/ F.sum("Total_Qty")).alias("% of C")
)
)

方法2:使用Windows

如果您想将这些作为5个附加列添加到您的数据集,您可以使用以下窗口OVER (PARTITION BY ID)Window.partitionBy("ID")的类似聚合,如下所示

使用spark sql
  1. 通过创建临时视图
  2. 确保您的数据框架是可访问的
input_df.createOrReplaceTempView("input_df")
  1. 在spark会话上运行下面的sql
output_df = sparkSession.sql("""
SELECT
*,
COUNT(Prod_Name) OVER (PARTITION BY ID) as `total products`,
SUM(Total_Qty) OVER (PARTITION BY ID) as `Total Qty`,
SUM(
CASE WHEN Type='A' THEN Total_Qty END
) OVER (PARTITION BY ID) / SUM(Total_Qty) OVER (PARTITION BY ID) as `% of A`,
SUM(
CASE WHEN Type='B' THEN Total_Qty END
) OVER (PARTITION BY ID)/ SUM(Total_Qty) OVER (PARTITION BY ID) as `% of B`,
SUM(
CASE WHEN Type='C' THEN Total_Qty END
) OVER (PARTITION BY ID) / SUM(Total_Qty) OVER (PARTITION BY ID) as `% of C`
FROM
input_df
GROUP BY
ID
""").na.fill(0)
使用pyspark API
from pyspark.sql import functions as F
from pyspark.sql import Window
agg_window = Window.partitionBy("Id")
output_df = (
input_df.withColumn(
"total products",
F.count("Prod_Name").over(agg_window)
)
.withColumn(
"Total Qty",
F.sum("Total_Qty").over(agg_window)
)
.withColumn(
"% of A",
F.sum(
F.when(
F.col("Type")=="A",F.col("Total_Qty")
).otherwise(0)
).over(agg_window) / F.sum("Total_Qty").over(agg_window)
)
.withColumn(
"% of B",
F.sum(
F.when(
F.col("Type")=="B",F.col("Total_Qty")
).otherwise(0)
).over(agg_window) / F.sum("Total_Qty").over(agg_window)
)
.withColumn(
"% of C",
F.sum(
F.when(
F.col("Type")=="C",F.col("Total_Qty")
).otherwise(0)
).over(agg_window) / F.sum("Total_Qty").over(agg_window)
)
)

让我知道这是否适合你。

一种方法(不重复A B C等)是使用枢轴。这个想法是先分组,然后旋转类型:

from pyspark.sql import functions as F
from pyspark.sql import Window as W
(df
.groupBy('ID', 'Type')
.agg(F.sum('Total Qty').alias('qty'))
.withColumn('pct', F.col('qty') / F.sum('qty').over(W.partitionBy('ID')))

.groupBy('ID')
.pivot('Type')
.agg(F.first('pct'))
.show()
)
# Output
# +---+------------------+------------------+-------------------+
# | ID|                 A|                 B|                  C|
# +---+------------------+------------------+-------------------+
# |  1|0.2962962962962963|0.5802469135802469|0.12345679012345678|
# |  2|0.5333333333333333|              null| 0.4666666666666667|
# +---+------------------+------------------+-------------------+

相关内容

  • 没有找到相关文章

最新更新