提取Scala Spark DataFrame的时间间隔



我正在尝试根据Scala和Spark

中的时间序列提取组合数据间隔

我在数据框中具有以下数据:

Id | State | StartTime           | EndTime
---+-------+---------------------+--------------------
 1 |   R   | 2019-01-01T03:00:00 | 2019-01-01T11:30:00
 1 |   R   | 2019-01-01T11:30:00 | 2019-01-01T15:00:00
 1 |   R   | 2019-01-01T15:00:00 | 2019-01-01T22:00:00
 1 |   W   | 2019-01-01T22:00:00 | 2019-01-02T04:30:00
 1 |   W   | 2019-01-02T04:30:00 | 2019-01-02T13:45:00
 1 |   R   | 2019-01-02T13:45:00 | 2019-01-02T18:30:00
 1 |   R   | 2019-01-02T18:30:00 | 2019-01-02T22:45:00

我需要根据ID和状态将数据提取到时间间隔中。结果数据需要看起来像:

Id | State | StartTime           | EndTime
---+-------+---------------------+--------------------
 1 |   R   | 2019-01-01T03:00:00 | 2019-01-01T22:00:00
 1 |   W   | 2019-01-01T22:00:00 | 2019-01-02T13:45:00
 1 |   R   | 2019-01-02T13:45:00 | 2019-01-02T22:45:00

请注意,前三个记录已被分组在一起,因为设备在2019-01-01-01T03:00:00至2019-01-01-01T22:00:00上处于R状态,然后切换到W状态在2019-01-01-01T22:00:00至2019-01-02T13:45:00的接下来的两个记录中,然后回到了最后两个记录的R状态。

,所以事实证明,当一个人的结束时间是另一个(oracle)翻译成Spark的开始时间时,答案是组合行。

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col,row_number}
import spark.implicits._
val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
val df2 = df
  .select('Id,'State,'StartTime,'EndTime,
          row_number().over(idSpec).as("idRowNumber"),
          row_number().over(idStateSpec).as("idStateRowNumber"))
  .groupBy('Id,'State,'idRowNumber - 'idStateRowNumber)
  .agg(min('StartTime).as("StartTime"), max('EndTime).as("EndTime"))

相关内容

  • 没有找到相关文章

最新更新