Spark Dataframe:查询连续具有相同结果的列的计数



我有一个仓库库存数据集,其中每两分钟报告一次当前状态。

我想获得没有库存的日期,即stock_amount = 0,如果值在一段时间内保持为0,则获得stock_amount为0的后续每一行的计数。换句话说,我想得到stock_amount变为0的日期,然后是它保持为0的次数。

例如,给定以下数据

Row(date='24-06-2020 11:03:00', stock_amount = 1)
Row(date='24-06-2020 11:05:00', stock_amount = 0)
Row(date='24-06-2020 11:07:00', stock_amount = 2)
Row(date='24-06-2020 11:09:00', stock_amount = 3)
Row(date='24-06-2020 16:32:00', stock_amount = 0)
Row(date='24-06-2020 16:34:00', stock_amount = 0)
Row(date='24-06-2020 16:36:00', stock_amount = 0)
Row(date='24-06-2020 16:38:00', stock_amount = 0)
Row(date='24-06-2020 16:40:00', stock_amount = 2)

结果应该是:

(date='24-06-2020 11:05:00', count=1)
(date='24-06-2020 16:32:00', count=4)

对于这些数据,

Row(date='26-07-2020 12:03:00', stock_amount = 3)
Row(date='26-07-2020 12:05:00', stock_amount = 0)
Row(date='26-07-2020 12:07:00', stock_amount = 4)
Row(date='26-07-2020 12:09:00', stock_amount = 4)
Row(date='26-07-2020 12:11:00', stock_amount = 0)
Row(date='26-07-2020 12:13:00', stock_amount = 2)
Row(date='26-07-2020 17:32:00', stock_amount = 0)
Row(date='26-07-2020 17:34:00', stock_amount = 0)
Row(date='26-07-2020 17:36:00', stock_amount = 0)
Row(date='26-07-2020 17:38:00', stock_amount = 0)
Row(date='26-07-2020 17:40:00', stock_amount = 1)

结果应该是:

(date='26-07-2020 12:05:00', count=1)
(date='26-07-2020 12:11:00', count=1)
(date='26-07-2020 17:32:00',  count=4)

这可以通过窗口函数和一些分组来完成。

注意:多个步骤用于更详细的说明,但可以组合在一个命令中

#%%
import pyspark.sql.functions as F
from pyspark.sql import Window
data= sqlContext.createDataFrame([
('24-06-2020 11:03:00',  1),
('24-06-2020 11:05:00',  0),
('24-06-2020 11:07:00',  2),
('24-06-2020 11:09:00',  3),
('24-06-2020 16:32:00',  0),
('24-06-2020 16:34:00',  0),
('24-06-2020 16:36:00',  0),
('24-06-2020 16:38:00',  0),
('24-06-2020 16:40:00',  2)],schema=['date','stock'])
data_grp = data.withColumn("stk_grp",(F.col('stock')!=0).cast('int'))
w= Window.orderBy('date')
data_sum = data_grp.withColumn("count_grp", F.sum('stk_grp').over(w))
data_stk = data_sum.withColumn("stk_zero",(F.col('stock')==0).cast('int'))
w1= Window.partitionBy('count_grp').orderBy('date')
data_res = data_stk.withColumn("fin_count", F.sum('stk_zero').over(w1))
#%%
data_filt = data_res.where("fin_count!=0")
data_res = data_filt.groupby('count_grp').agg(F.min('date').alias('date'),F.max('fin_count').alias('count'))
data_res.show()
+---------+-------------------+--------------+
|count_grp|               date|         count|
+---------+-------------------+--------------+
|        1|24-06-2020 11:05:00|             1|
|        3|24-06-2020 16:32:00|             4|
+---------+-------------------+--------------+

最新更新