如何在Pyspark中实现这种动态分配



我有一个数据框store_df:-

tbody> <<tr>40000009704000000435400005554240000422064000014114
store
637PacPacPacPacPac
from pyspark.sql import SparkSession,Row
from pyspark.sql.functions import mean, min, max,count,row_number,lit,udf,col
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
# creating the session
spark = SparkSession.builder.getOrCreate()

# schema creation by passing list
store_df  = spark.createDataFrame([
Row(Store=637,ID=4000000970, Div='Pac'),
Row(Store=637,ID=4000000435, Div='Pac'),
Row(Store=637,ID=4000055542, Div='Pac'),
Row(Store=637,ID=4000042206, Div='Pac'),
Row(Store=637,ID=4000014114, Div='Pac'),
])
final_list   = spark.createDataFrame([
Row(Div='Pac',ID=4000000970, Rank=1,Category='A'),
Row(Div='Pac',ID=4000000432, Rank=2,Category='A'),
Row(Div='Pac',ID=4000000405, Rank=3,Category='A'),
Row(Div='Pac',ID=4000042431, Rank=4,Category='A'),
Row(Div='Pac',ID=2200028596, Rank=5,Category='B'),
Row(Div='Pac',ID=4000000032, Rank=6,Category='A'),
Row(Div='Pac',ID=2200028594, Rank=7,Category='B'),
Row(Div='Pac',ID=4000014114, Rank=8,Category='B'),
Row(Div='Pac',ID=2230001789, Rank=9,Category='D'),
Row(Div='Pac',ID=2200001789, Rank=10,Category='C'),
Row(Div='Pac',ID=2200001787, Rank=11,Category='D'),
])
max_df  =  spark.createDataFrame([
Row(Store=637,MAX_A=3, MAX_B=0,MAX_C=2,MAX_D=0,N=5),])
Updated_final_list = final_list.join(store_df, ["Div","ID"], "left_anti")
Updated_final_list.show()
# Count each Category
Category_count = ( Updated_final_list.groupBy("Category")
.pivot("Category")
.count() )
Category_count = Category_count.fillna(value=0)
Category_count = Category_count.agg({i:'max' for i in 
Category_count.columns[1:]})
for i in Category_count.columns:
Category_count = Category_count.withColumnRenamed(i, i.split('(')[1][0])
# The following code is to create id to merge max df and Category_count
max_df = max_df.withColumn("M_ID", lit(1))
Category_count = Category_count.withColumn("M_ID", lit(1))
max_df1 = max_df.join(Category_count,['M_ID'],'inner')
# Calculate number of id needs to be created for each category
@udf(returnType=IntegerType())
def max_id_cal(A,B):
if B <=A:
return B
else:
return B-(B-A)
max_col = [i for i in max_df.columns if i.startswith('MAX')]
for c in max_col:
max_df1 = max_df1.withColumn(c.split('_')[1], 
max_id_cal(max_df1[c],c.split('_')[1]))
# New Rank is created for each category
windowDept = Window.partitionBy("Category").orderBy(col("Rank").asc())
Updated_final_list1=Updated_final_list.withColumn("Rank1",row_number()
.over(windowDept))
# Filter number records for each category
idx = 0
max_col = [i for i in max_df.columns if i.startswith('MAX')]
for c in max_col:
max_value = max_df1.select(c.split('_')[1]).collect()[0][0]
if idx == 0:
df=Updated_final_list1.filter((col("Category")==c.split('_')[1]) & 
(col("Rank1") <= max_value))
else:
df2=Updated_final_list1.filter((col("Category")==c.split('_')[1]) & 
(col("Rank1") <= max_value))
df = df.union(df2)

idx +=1

df = df.drop("Rank")
df= df.withColumnRenamed('Rank1', 'Rank')
df.show()

最新更新