PySpark和时间序列数据:如何巧妙地避免重叠日期?



我有以下示例 Spark 数据帧

import pandas as pd
import pyspark
import pyspark.sql.functions as fn
from pyspark.sql.window import Window
raw_df = pd.DataFrame([
(1115, dt.datetime(2019,8,5,18,20), dt.datetime(2019,8,5,18,40)),
(484, dt.datetime(2019,8,5,18,30), dt.datetime(2019,8,9,18,40)),
(484, dt.datetime(2019,8,4,18,30), dt.datetime(2019,8,6,18,40)),
(484, dt.datetime(2019,8,2,18,30), dt.datetime(2019,8,3,18,40)),
(484, dt.datetime(2019,8,7,18,50), dt.datetime(2019,8,9,18,50)),
(1115, dt.datetime(2019,8,6,18,20), dt.datetime(2019,8,6,18,40)),
], columns=['server_id', 'start_time', 'end_time'])
df = spark.createDataFrame(raw_df)

这导致

+---------+-------------------+-------------------+
|server_id|         start_time|           end_time|
+---------+-------------------+-------------------+
|     1115|2019-08-05 18:20:00|2019-08-05 18:40:00|
|      484|2019-08-05 18:30:00|2019-08-09 18:40:00|
|      484|2019-08-04 18:30:00|2019-08-06 18:40:00|
|      484|2019-08-02 18:30:00|2019-08-03 18:40:00|
|      484|2019-08-07 18:50:00|2019-08-09 18:50:00|
|     1115|2019-08-06 18:20:00|2019-08-06 18:40:00|
+---------+-------------------+-------------------+

这表示每个服务器的使用日期范围。我想将其转换为不重叠日期的时间序列。

我想在不使用 UDF 的情况下实现这一点。

这就是我现在正在做的事情,这是错误的

w = Window().orderBy(fn.lit('A'))
# Separate start/end date of usage into rows
df = (df.withColumn('start_end_time', fn.array('start_time', 'end_time'))
.withColumn('event_dt', fn.explode('start_end_time'))
.withColumn('row_num', fn.row_number().over(w)))
# Indicate start/end date of the usage (start date will always be on odd rows)
df = (df.withColumn('is_start', fn.when(fn.col('row_num')%2 == 0, 0).otherwise(1))
.select('server_id', 'event_dt', 'is_start'))

这给了

+---------+-------------------+--------+
|server_id|           event_dt|is_start|
+---------+-------------------+--------+
|     1115|2019-08-05 18:20:00|       1|
|     1115|2019-08-05 18:40:00|       0|
|      484|2019-08-05 18:30:00|       1|
|      484|2019-08-09 18:40:00|       0|
|      484|2019-08-04 18:30:00|       1|
|      484|2019-08-06 18:40:00|       0|
|      484|2019-08-02 18:30:00|       1|
|      484|2019-08-03 18:40:00|       0|
|      484|2019-08-07 18:50:00|       1|
|      484|2019-08-09 18:50:00|       0|
|     1115|2019-08-06 18:20:00|       1|
|     1115|2019-08-06 18:40:00|       0|
+---------+-------------------+--------+

我想实现的最终结果如下

+---------+-------------------+--------+
|server_id|           event_dt|is_start|
+---------+-------------------+--------+
|     1115|2019-08-05 18:20:00|       1|
|     1115|2019-08-05 18:40:00|       0|
|     1115|2019-08-06 18:20:00|       1|
|     1115|2019-08-06 18:40:00|       0|
|      484|2019-08-02 18:30:00|       1|
|      484|2019-08-03 18:40:00|       0|
|      484|2019-08-04 18:30:00|       1|
|      484|2019-08-09 18:50:00|       0|
+---------+-------------------+--------+

因此,对于server_id484,我有实际的开始和结束日期,中间没有所有的噪音。

您对如何在不使用 UDF 的情况下实现这一目标有任何建议吗?

谢谢

IIUC,这是可以通过使用 Window lag((,sum((函数为符合某些特定条件的有序连续行添加子组标签来解决的问题之一。类似于我们在 Pandas 中使用shift((+cumsum((所做的。

  1. 设置窗口规范w1

    w1 = Window.partitionBy('server_id').orderBy('start_time')
    

    并计算以下内容:

    • max('end_time'(:窗口w1当前行之前的最大end_time
    • lag('end_time'(:前一个end_time
    • sum('prev_end_time

    以上三项可以对应 Pandas cummax((、shift((cumsum((。

  2. 通过使用max(end_time).over(w1)更新df.end_time并设置子组标签g,然后执行groupby(server_id, g)来计算min(start_time)max(end_time)来计算df1

    df1 = df.withColumn('end_time', fn.max('end_time').over(w1)) 
    .withColumn('g', fn.sum(fn.when(fn.lag('end_time').over(w1) < fn.col('start_time'),1).otherwise(0)).over(w1)) 
    .groupby('server_id', 'g') 
    .agg(fn.min('start_time').alias('start_time'), fn.max('end_time').alias('end_time'))
    df1.show()
    +---------+---+-------------------+-------------------+
    |server_id|  g|         start_time|           end_time|
    +---------+---+-------------------+-------------------+
    |     1115|  0|2019-08-05 18:20:00|2019-08-05 18:40:00|
    |     1115|  1|2019-08-06 18:20:00|2019-08-06 18:40:00|
    |      484|  0|2019-08-02 18:30:00|2019-08-03 18:40:00|
    |      484|  1|2019-08-04 18:30:00|2019-08-09 18:50:00|
    +---------+---+-------------------+-------------------+
    
  3. 有了df1后,我们可以使用两个选择来拆分数据,然后合并结果集:

    df_new = df1.selectExpr('server_id', 'start_time as event_dt', '1 as is_start').union(
    df1.selectExpr('server_id', 'end_time as event_dt', '0 as is_start')
    )        
    df_new.orderBy('server_id', 'event_dt').show()                                                                            
    +---------+-------------------+--------+
    |server_id|           event_dt|is_start|
    +---------+-------------------+--------+
    |      484|2019-08-02 18:30:00|       1|
    |      484|2019-08-03 18:40:00|       0|
    |      484|2019-08-04 18:30:00|       1|
    |      484|2019-08-09 18:50:00|       0|
    |     1115|2019-08-05 18:20:00|       1|
    |     1115|2019-08-05 18:40:00|       0|
    |     1115|2019-08-06 18:20:00|       1|
    |     1115|2019-08-06 18:40:00|       0|
    +---------+-------------------+--------+
    

最新更新