我有一个仓库库存数据集,其中每两分钟报告一次当前状态。
我想获得没有库存的日期,即stock_amount = 0
,如果值在一段时间内保持为0,则获得stock_amount为0的后续每一行的计数。换句话说,我想得到stock_amount
变为0的日期,然后是它保持为0的次数。
例如,给定以下数据
Row(date='24-06-2020 11:03:00', stock_amount = 1)
Row(date='24-06-2020 11:05:00', stock_amount = 0)
Row(date='24-06-2020 11:07:00', stock_amount = 2)
Row(date='24-06-2020 11:09:00', stock_amount = 3)
Row(date='24-06-2020 16:32:00', stock_amount = 0)
Row(date='24-06-2020 16:34:00', stock_amount = 0)
Row(date='24-06-2020 16:36:00', stock_amount = 0)
Row(date='24-06-2020 16:38:00', stock_amount = 0)
Row(date='24-06-2020 16:40:00', stock_amount = 2)
结果应该是:
(date='24-06-2020 11:05:00', count=1)
(date='24-06-2020 16:32:00', count=4)
对于这些数据,
Row(date='26-07-2020 12:03:00', stock_amount = 3)
Row(date='26-07-2020 12:05:00', stock_amount = 0)
Row(date='26-07-2020 12:07:00', stock_amount = 4)
Row(date='26-07-2020 12:09:00', stock_amount = 4)
Row(date='26-07-2020 12:11:00', stock_amount = 0)
Row(date='26-07-2020 12:13:00', stock_amount = 2)
Row(date='26-07-2020 17:32:00', stock_amount = 0)
Row(date='26-07-2020 17:34:00', stock_amount = 0)
Row(date='26-07-2020 17:36:00', stock_amount = 0)
Row(date='26-07-2020 17:38:00', stock_amount = 0)
Row(date='26-07-2020 17:40:00', stock_amount = 1)
结果应该是:
(date='26-07-2020 12:05:00', count=1)
(date='26-07-2020 12:11:00', count=1)
(date='26-07-2020 17:32:00', count=4)
这可以通过窗口函数和一些分组来完成。
注意:多个步骤用于更详细的说明,但可以组合在一个命令中
#%%
import pyspark.sql.functions as F
from pyspark.sql import Window
data= sqlContext.createDataFrame([
('24-06-2020 11:03:00', 1),
('24-06-2020 11:05:00', 0),
('24-06-2020 11:07:00', 2),
('24-06-2020 11:09:00', 3),
('24-06-2020 16:32:00', 0),
('24-06-2020 16:34:00', 0),
('24-06-2020 16:36:00', 0),
('24-06-2020 16:38:00', 0),
('24-06-2020 16:40:00', 2)],schema=['date','stock'])
data_grp = data.withColumn("stk_grp",(F.col('stock')!=0).cast('int'))
w= Window.orderBy('date')
data_sum = data_grp.withColumn("count_grp", F.sum('stk_grp').over(w))
data_stk = data_sum.withColumn("stk_zero",(F.col('stock')==0).cast('int'))
w1= Window.partitionBy('count_grp').orderBy('date')
data_res = data_stk.withColumn("fin_count", F.sum('stk_zero').over(w1))
#%%
data_filt = data_res.where("fin_count!=0")
data_res = data_filt.groupby('count_grp').agg(F.min('date').alias('date'),F.max('fin_count').alias('count'))
data_res.show()
+---------+-------------------+--------------+
|count_grp| date| count|
+---------+-------------------+--------------+
| 1|24-06-2020 11:05:00| 1|
| 3|24-06-2020 16:32:00| 4|
+---------+-------------------+--------------+