如何对PySpark Dataframe中的每一行执行复杂的处理



我有一个PySpark数据框架如下

Customer_ID   Address_ID    Order_ID    Order_Date
Cust_1       Addr_1            1       31-Dec-20
Cust_1       Addr_1            2       23-Jan-21
Cust_1       Addr_1            3       06-Feb-21
Cust_1       Addr_2            4       13-Feb-21
Cust_1       Addr_2            5       20-Feb-21
Cust_1       Addr_3            6       18-Mar-21
Cust_1       Addr_3            7       23-Mar-21
Cust_2       Addr_4            8       31-Dec-20
Cust_2       Addr_4            9       23-Jan-21
Cust_2       Addr_4            10      06-Feb-21
Cust_2       Addr_4            11      13-Feb-21
Cust_2       Addr_4            12      20-Feb-21
Cust_2       Addr_5            13      18-Mar-21
Cust_2       Addr_5            14      23-Mar-21

列分别为customer id,address id,order iddate the order was

Order_ID总是唯一的

For每个顺序(每一行),我需要计算一个(客户c1,地址a1)的订单份额

ord_share(c1,a1)表示order_share由下面的公式定义,

The total number of orders between (Order_Date) and (Order_Date - 90 days) by c1 from a1  
----------------------------------------------------------------------------------------
The total number of orders between (Order_Date) and (Order_Date - 90 days) for all addresses by c1  

注意-上式中的下划线表示除法

90天是窗口大小。
上表中的一个例子:(为了便于理解,我将order_share作为一个分数)

For ORDER_ID 7:
订单总数为7 - (by Cust_1)
ord_share(Cust_1,Addr_1) = 3/7,ord_share(Cust_1,Addr_2) = 2/7,ord_share(Cust_1,Addr_3) = 2/7

For ORDER_ID 6:
订单总数为6 - (by Cust_1)
ord_share(Cust_1,Addr_1) = 3/6,ord_share(Cust_1,Addr_2) = 2/6,ord_share(Cust_1,Addr_3) = 1/6

For ORDER_ID 5:
订单总数为5 - (by Cust_1)
ord_share(Cust_1,Addr_1) = 3,ord_share(Cust_1,Addr_2) = 2,ord_share(Cust_1,Addr_3) = 0

等等……我需要为存储这些行。我的输出格式应该像下面这样

(Is_original_address—该列表示Address_ID是否为下单时的原始地址)

Customer_ID   Address_ID    Order_ID    Order_Share  Is_original_address
Cust_1         Addr_1            7       3/7           0 
Cust_1         Addr_2            7       2/7           0
Cust_1         Addr_3            7       2/7           1
Cust_1         Addr_1            6       3/6           0
Cust_1         Addr_2            6       2/6           0
Cust_1         Addr_3            6       1/6           1
Cust_1         Addr_1            5       3/5           0
Cust_1         Addr_2            5       2/5           1
Cust_1         Addr_3            5       0/5           0 
.
.
.
For all rows

基本上就是每行根据客户地址的数量,在输入中扩展为输出中的多行

注意-初始数据框中的列没有排序或分组,我只是选择这样一个例子来帮助解释

我发现解决这个问题很难。我想了很多,我似乎不能想到任何方法来加入/分组数据做到这一点,因为每一行都是唯一的。我真的不知道如何得到输出数据帧。
据我所知,我必须克隆原始数据框架,并且对于每一行,我可能必须执行多个组by或连接。我真的不确定如何开始实施。

任何帮助都会很感激。谢谢!

如果还需要其他信息,请告诉我。

正如@Christophe所评论的那样,这使用了窗口函数,但仅用于计算分母

data=[
('c1','a1', 1,'2020-12-31'),
('c1','a1', 2,'2021-01-23'),
('c1','a1', 3,'2021-02-06'),
('c1','a2', 4,'2021-02-13'),
('c1','a2', 5,'2021-02-20'),
('c1','a3', 6,'2021-03-18'),
('c1','a3', 7,'2021-03-23'),
('c2','a4', 8,'2020-12-31'),
('c2','a4', 9,'2021-01-23'),
('c2','a4',10,'2021-02-06'),
('c2','a4',11,'2021-02-13'),
('c2','a4',12,'2021-02-20'),
('c2','a5',13,'2021-03-18'),
('c2','a5',14,'2021-03-23'),
]
df = spark.createDataFrame(data=data, schema = ['c_id','a_id','order_id','order_date'])
df=df.select('c_id','a_id','order_id',F.to_date(F.col('order_date')).alias('date'))
df.createOrReplaceTempView('orders')
spark.sql("""
WITH address_combinations AS (
SELECT o1.order_id, o2.c_id, o2.a_id
, CASE WHEN o1.a_id=o2.a_id THEN 1 ELSE 0 END AS is_original_address
, COUNT(CASE WHEN DATEDIFF(o1.date, o2.date) BETWEEN 0 AND 90 THEN 1 END) AS num_orders
FROM orders o1
JOIN orders o2 ON o1.c_id=o2.c_id
GROUP BY o1.order_id, o2.c_id, o2.a_id, is_original_address
)
SELECT c_id, a_id, order_id
, CONCAT(num_orders, '/', SUM(num_orders) OVER (PARTITION BY order_id)) AS order_share
, is_original_address
FROM address_combinations
ORDER BY order_id, a_id
""").show(200)

输出:

+----+----+--------+-----------+-------------------+
|c_id|a_id|order_id|order_share|is_original_address|
+----+----+--------+-----------+-------------------+
|  c1|  a1|       1|        1/1|                  1|
|  c1|  a2|       1|        0/1|                  0|
|  c1|  a3|       1|        0/1|                  0|
|  c1|  a1|       2|        2/2|                  1|
|  c1|  a2|       2|        0/2|                  0|
|  c1|  a3|       2|        0/2|                  0|
|  c1|  a1|       3|        3/3|                  1|
|  c1|  a2|       3|        0/3|                  0|
|  c1|  a3|       3|        0/3|                  0|
|  c1|  a1|       4|        3/4|                  0|
|  c1|  a2|       4|        1/4|                  1|
|  c1|  a3|       4|        0/4|                  0|
|  c1|  a1|       5|        3/5|                  0|
|  c1|  a2|       5|        2/5|                  1|
|  c1|  a3|       5|        0/5|                  0|
|  c1|  a1|       6|        3/6|                  0|
|  c1|  a2|       6|        2/6|                  0|
|  c1|  a3|       6|        1/6|                  1|
|  c1|  a1|       7|        3/7|                  0|
|  c1|  a2|       7|        2/7|                  0|
|  c1|  a3|       7|        2/7|                  1|
|  c2|  a4|       8|        1/1|                  1|
|  c2|  a5|       8|        0/1|                  0|
|  c2|  a4|       9|        2/2|                  1|
|  c2|  a5|       9|        0/2|                  0|
|  c2|  a4|      10|        3/3|                  1|
|  c2|  a5|      10|        0/3|                  0|
|  c2|  a4|      11|        4/4|                  1|
|  c2|  a5|      11|        0/4|                  0|
|  c2|  a4|      12|        5/5|                  1|
|  c2|  a5|      12|        0/5|                  0|
|  c2|  a4|      13|        5/6|                  0|
|  c2|  a5|      13|        1/6|                  1|
|  c2|  a4|      14|        5/7|                  0|
|  c2|  a5|      14|        2/7|                  1|
+----+----+--------+-----------+-------------------+

不确定这是否需要,但这里是使用Pythondfapi重新实现的完全相同的SQL:

from pyspark.sql import Window
(
df.alias('o1')
.join(df.alias('o2')
, on=F.col('o1.c_id')==F.col('o2.c_id')
, how='inner'
)
.select(F.col('o1.order_id'), F.col('o2.c_id'), F.col('o2.a_id')
, F.when(F.datediff(F.col('o1.date'),F.col('o2.date')).between(0, 90), 1).alias('is_order')
, F.when(F.col('o1.a_id')==F.col('o2.a_id'), 1).otherwise(0).alias('is_original_address')
)
.groupby('order_id','c_id','a_id','is_original_address')
.agg(F.count('is_order').alias('num_orders'))
.select('c_id','a_id','order_id'
, F.concat(F.col('num_orders')
, F.lit('/')
, F.sum('num_orders').over(Window.partitionBy('order_id'))
).alias('order_share')
, 'is_original_address'
)
.sort('order_id','a_id')
).show(200)

快速解释:address_combinations首先将orders表与自身自连接以获得所有可能的组合。然而,可能会有重复,所以我们执行GROUP BYCOUNT在90d时间窗口内的订单数。

下一部分简单地给出了分母并将其格式化("x/y")

希望满足您的要求!

最新更新