我有一个PySpark数据框架如下
Customer_ID Address_ID Order_ID Order_Date
Cust_1 Addr_1 1 31-Dec-20
Cust_1 Addr_1 2 23-Jan-21
Cust_1 Addr_1 3 06-Feb-21
Cust_1 Addr_2 4 13-Feb-21
Cust_1 Addr_2 5 20-Feb-21
Cust_1 Addr_3 6 18-Mar-21
Cust_1 Addr_3 7 23-Mar-21
Cust_2 Addr_4 8 31-Dec-20
Cust_2 Addr_4 9 23-Jan-21
Cust_2 Addr_4 10 06-Feb-21
Cust_2 Addr_4 11 13-Feb-21
Cust_2 Addr_4 12 20-Feb-21
Cust_2 Addr_5 13 18-Mar-21
Cust_2 Addr_5 14 23-Mar-21
列分别为customer id
,address id
,order id
和date the order was
Order_ID
总是唯一的
For每个顺序(每一行),我需要计算一个(客户c1,地址a1)的订单份额对
用ord_share(c1,a1)表示order_share由下面的公式定义,
The total number of orders between (Order_Date) and (Order_Date - 90 days) by c1 from a1
----------------------------------------------------------------------------------------
The total number of orders between (Order_Date) and (Order_Date - 90 days) for all addresses by c1
注意-上式中的下划线表示除法
90天是窗口大小。
上表中的一个例子:(为了便于理解,我将order_share
作为一个分数)
For ORDER_ID 7:
订单总数为7 - (by Cust_1)
ord_share(Cust_1,Addr_1) = 3/7,ord_share(Cust_1,Addr_2) = 2/7,ord_share(Cust_1,Addr_3) = 2/7
For ORDER_ID 6:
订单总数为6 - (by Cust_1)
ord_share(Cust_1,Addr_1) = 3/6,ord_share(Cust_1,Addr_2) = 2/6,ord_share(Cust_1,Addr_3) = 1/6
For ORDER_ID 5:
订单总数为5 - (by Cust_1)
ord_share(Cust_1,Addr_1) = 3,ord_share(Cust_1,Addr_2) = 2,ord_share(Cust_1,Addr_3) = 0
等等……我需要为存储这些行。我的输出格式应该像下面这样
(Is_original_address—该列表示Address_ID是否为下单时的原始地址)
Customer_ID Address_ID Order_ID Order_Share Is_original_address
Cust_1 Addr_1 7 3/7 0
Cust_1 Addr_2 7 2/7 0
Cust_1 Addr_3 7 2/7 1
Cust_1 Addr_1 6 3/6 0
Cust_1 Addr_2 6 2/6 0
Cust_1 Addr_3 6 1/6 1
Cust_1 Addr_1 5 3/5 0
Cust_1 Addr_2 5 2/5 1
Cust_1 Addr_3 5 0/5 0
.
.
.
For all rows
基本上就是每行根据客户地址的数量,在输入中扩展为输出中的多行
注意-初始数据框中的列没有排序或分组,我只是选择这样一个例子来帮助解释
我发现解决这个问题很难。我想了很多,我似乎不能想到任何方法来加入/分组数据做到这一点,因为每一行都是唯一的。我真的不知道如何得到输出数据帧。据我所知,我必须克隆原始数据框架,并且对于每一行,我可能必须执行多个组by或连接。我真的不确定如何开始实施。
任何帮助都会很感激。谢谢!
如果还需要其他信息,请告诉我。
正如@Christophe所评论的那样,这使用了窗口函数,但仅用于计算分母
data=[
('c1','a1', 1,'2020-12-31'),
('c1','a1', 2,'2021-01-23'),
('c1','a1', 3,'2021-02-06'),
('c1','a2', 4,'2021-02-13'),
('c1','a2', 5,'2021-02-20'),
('c1','a3', 6,'2021-03-18'),
('c1','a3', 7,'2021-03-23'),
('c2','a4', 8,'2020-12-31'),
('c2','a4', 9,'2021-01-23'),
('c2','a4',10,'2021-02-06'),
('c2','a4',11,'2021-02-13'),
('c2','a4',12,'2021-02-20'),
('c2','a5',13,'2021-03-18'),
('c2','a5',14,'2021-03-23'),
]
df = spark.createDataFrame(data=data, schema = ['c_id','a_id','order_id','order_date'])
df=df.select('c_id','a_id','order_id',F.to_date(F.col('order_date')).alias('date'))
df.createOrReplaceTempView('orders')
spark.sql("""
WITH address_combinations AS (
SELECT o1.order_id, o2.c_id, o2.a_id
, CASE WHEN o1.a_id=o2.a_id THEN 1 ELSE 0 END AS is_original_address
, COUNT(CASE WHEN DATEDIFF(o1.date, o2.date) BETWEEN 0 AND 90 THEN 1 END) AS num_orders
FROM orders o1
JOIN orders o2 ON o1.c_id=o2.c_id
GROUP BY o1.order_id, o2.c_id, o2.a_id, is_original_address
)
SELECT c_id, a_id, order_id
, CONCAT(num_orders, '/', SUM(num_orders) OVER (PARTITION BY order_id)) AS order_share
, is_original_address
FROM address_combinations
ORDER BY order_id, a_id
""").show(200)
输出:
+----+----+--------+-----------+-------------------+
|c_id|a_id|order_id|order_share|is_original_address|
+----+----+--------+-----------+-------------------+
| c1| a1| 1| 1/1| 1|
| c1| a2| 1| 0/1| 0|
| c1| a3| 1| 0/1| 0|
| c1| a1| 2| 2/2| 1|
| c1| a2| 2| 0/2| 0|
| c1| a3| 2| 0/2| 0|
| c1| a1| 3| 3/3| 1|
| c1| a2| 3| 0/3| 0|
| c1| a3| 3| 0/3| 0|
| c1| a1| 4| 3/4| 0|
| c1| a2| 4| 1/4| 1|
| c1| a3| 4| 0/4| 0|
| c1| a1| 5| 3/5| 0|
| c1| a2| 5| 2/5| 1|
| c1| a3| 5| 0/5| 0|
| c1| a1| 6| 3/6| 0|
| c1| a2| 6| 2/6| 0|
| c1| a3| 6| 1/6| 1|
| c1| a1| 7| 3/7| 0|
| c1| a2| 7| 2/7| 0|
| c1| a3| 7| 2/7| 1|
| c2| a4| 8| 1/1| 1|
| c2| a5| 8| 0/1| 0|
| c2| a4| 9| 2/2| 1|
| c2| a5| 9| 0/2| 0|
| c2| a4| 10| 3/3| 1|
| c2| a5| 10| 0/3| 0|
| c2| a4| 11| 4/4| 1|
| c2| a5| 11| 0/4| 0|
| c2| a4| 12| 5/5| 1|
| c2| a5| 12| 0/5| 0|
| c2| a4| 13| 5/6| 0|
| c2| a5| 13| 1/6| 1|
| c2| a4| 14| 5/7| 0|
| c2| a5| 14| 2/7| 1|
+----+----+--------+-----------+-------------------+
不确定这是否需要,但这里是使用Pythondf
api重新实现的完全相同的SQL:
from pyspark.sql import Window
(
df.alias('o1')
.join(df.alias('o2')
, on=F.col('o1.c_id')==F.col('o2.c_id')
, how='inner'
)
.select(F.col('o1.order_id'), F.col('o2.c_id'), F.col('o2.a_id')
, F.when(F.datediff(F.col('o1.date'),F.col('o2.date')).between(0, 90), 1).alias('is_order')
, F.when(F.col('o1.a_id')==F.col('o2.a_id'), 1).otherwise(0).alias('is_original_address')
)
.groupby('order_id','c_id','a_id','is_original_address')
.agg(F.count('is_order').alias('num_orders'))
.select('c_id','a_id','order_id'
, F.concat(F.col('num_orders')
, F.lit('/')
, F.sum('num_orders').over(Window.partitionBy('order_id'))
).alias('order_share')
, 'is_original_address'
)
.sort('order_id','a_id')
).show(200)
快速解释:address_combinations
首先将orders表与自身自连接以获得所有可能的组合。然而,可能会有重复,所以我们执行GROUP BY
和COUNT
在90d时间窗口内的订单数。
下一部分简单地给出了分母并将其格式化("x/y")
希望满足您的要求!