我有如下格式的数据框架。每个产品都有不同的id,以及关联的产品名称和类型。
ID Prod Name Type Total Qty
1 ABC A 200
1 DEF B 350
1 GEH B 120
1 JIK C 100
1 LMO A 40
2 ABC A 10
2 DEF A 20
2 GEH C 30
2 JIK C 40
2 LMO A 50
因此,我试图在单独的列中获得该产品名称和ID的总A, B和C的百分比。作为第一步,我尝试使用窗口函数,但它给了我计数"整个列
df.withColumn("count_cat", F.count("Type").over(Window.partitionBy("Type")))
但是我需要这样的东西
ID total Products Total Qty % of A % of B % of C
1 5 810 0.29 0.58 0.12
方法一:按聚合分组
根据您的预期输出,基于GROUP BY Id
的聚合就足够了。
假设初始数据集存储在数据帧input_df
- 通过创建临时视图 确保您的数据框架是可访问的
input_df.createOrReplaceTempView("input_df")
- 在spark会话上运行下面的sql
output_df = sparkSession.sql("""
SELECT
ID,
COUNT(Prod_Name) as `total products`,
SUM(Total_Qty) as `Total Qty`,
SUM(
CASE WHEN Type='A' THEN Total_Qty END
) / SUM(Total_Qty) as `% of A`,
SUM(
CASE WHEN Type='B' THEN Total_Qty END
) / SUM(Total_Qty) as `% of B`,
SUM(
CASE WHEN Type='C' THEN Total_Qty END
) / SUM(Total_Qty) as `% of C`
FROM
input_df
GROUP BY
ID
""").na.fill(0)
使用pyspark APIfrom pyspark.sql import functions as F
output_df = (
input_df.groupBy("ID")
.agg(
F.count("Prod_Name").alias("total products"),
F.sum("Total_Qty").alias("Total Qty"),
(F.sum(
F.when(
F.col("Type")=="A",F.col("Total_Qty")
).otherwise(0)
) / F.sum("Total_Qty")).alias("% of A"),
(F.sum(
F.when(
F.col("Type")=="B",F.col("Total_Qty")
).otherwise(0)
)/ F.sum("Total_Qty")).alias("% of B"),
(F.sum(
F.when(
F.col("Type")=="C",F.col("Total_Qty")
).otherwise(0)
)/ F.sum("Total_Qty")).alias("% of C")
)
)
方法2:使用Windows
如果您想将这些作为5个附加列添加到您的数据集,您可以使用以下窗口OVER (PARTITION BY ID)
或Window.partitionBy("ID")
的类似聚合,如下所示
- 通过创建临时视图 确保您的数据框架是可访问的
input_df.createOrReplaceTempView("input_df")
- 在spark会话上运行下面的sql
output_df = sparkSession.sql("""
SELECT
*,
COUNT(Prod_Name) OVER (PARTITION BY ID) as `total products`,
SUM(Total_Qty) OVER (PARTITION BY ID) as `Total Qty`,
SUM(
CASE WHEN Type='A' THEN Total_Qty END
) OVER (PARTITION BY ID) / SUM(Total_Qty) OVER (PARTITION BY ID) as `% of A`,
SUM(
CASE WHEN Type='B' THEN Total_Qty END
) OVER (PARTITION BY ID)/ SUM(Total_Qty) OVER (PARTITION BY ID) as `% of B`,
SUM(
CASE WHEN Type='C' THEN Total_Qty END
) OVER (PARTITION BY ID) / SUM(Total_Qty) OVER (PARTITION BY ID) as `% of C`
FROM
input_df
GROUP BY
ID
""").na.fill(0)
使用pyspark APIfrom pyspark.sql import functions as F
from pyspark.sql import Window
agg_window = Window.partitionBy("Id")
output_df = (
input_df.withColumn(
"total products",
F.count("Prod_Name").over(agg_window)
)
.withColumn(
"Total Qty",
F.sum("Total_Qty").over(agg_window)
)
.withColumn(
"% of A",
F.sum(
F.when(
F.col("Type")=="A",F.col("Total_Qty")
).otherwise(0)
).over(agg_window) / F.sum("Total_Qty").over(agg_window)
)
.withColumn(
"% of B",
F.sum(
F.when(
F.col("Type")=="B",F.col("Total_Qty")
).otherwise(0)
).over(agg_window) / F.sum("Total_Qty").over(agg_window)
)
.withColumn(
"% of C",
F.sum(
F.when(
F.col("Type")=="C",F.col("Total_Qty")
).otherwise(0)
).over(agg_window) / F.sum("Total_Qty").over(agg_window)
)
)
让我知道这是否适合你。
一种方法(不重复A B C等)是使用枢轴。这个想法是先分组,然后旋转类型:
from pyspark.sql import functions as F
from pyspark.sql import Window as W
(df
.groupBy('ID', 'Type')
.agg(F.sum('Total Qty').alias('qty'))
.withColumn('pct', F.col('qty') / F.sum('qty').over(W.partitionBy('ID')))
.groupBy('ID')
.pivot('Type')
.agg(F.first('pct'))
.show()
)
# Output
# +---+------------------+------------------+-------------------+
# | ID| A| B| C|
# +---+------------------+------------------+-------------------+
# | 1|0.2962962962962963|0.5802469135802469|0.12345679012345678|
# | 2|0.5333333333333333| null| 0.4666666666666667|
# +---+------------------+------------------+-------------------+