给定时间的状态快照数据集,如何将其转换为具有每个状态的有效开始和结束时间的数据集?


给定一个 S 数据集,

目标是生成一个 E 数据集,其中 S 和 E 定义如下:

// event where start (s) is inclusive, end (e) is exclusive
case class E(id: Int, state: String, s: Int, e: Option[Int])
//snapshot with state at t for an id
case class S(id: Int, state: String, time: Int)
//Example test case
val ss: Dataset[S] = Seq(S(100, "a", 1), S(100, "b", 2), S(100, "b", 3), S(100, "a", 4), S(100, "a", 5), S(100, "a", 6), S(100, "c", 9))
      .toDS
val es: Dataset[E] = ss
      .toEs
es.collect() must contain theSameElementsAs
      Seq(E(100, "a", 1, Some(2)), E(100, "b", 2, Some(4)), E(100, "a", 4, Some(9)), E(100, "c", 9, None))

一个状态可以有多个快照(在不同时间),但输出应累积有效的开始和结束时间。此外,最后一个活动状态应该在输出中没有结束日期(选项)。

上述toEs定义如下:

implicit class SOps(ss: Dataset[S]) {
    def toEs(implicit spark: SparkSession): Dataset[E] = ???
}

下图描述了期望的转换

以下是使用 flatMapGroups 的解决方案,如果组太大而无法放入内存,该解决方案将溢出到磁盘

def toEs(implicit spark: SparkSession): Dataset[E] = {
  import spark.implicits._
  ss
    .sort(ss("id"), ss("t"))
    .groupByKey(s => s.id)
    .flatMapGroups { (_, ss) =>
      new Iterator[E] {
        var nextStart: Option[S] = None
        override def hasNext: Boolean = ss.hasNext || nextStart.isDefined
        override def next(): E = {
          if (ss.hasNext) {
            val start = nextStart.getOrElse(ss.next())
            var last = ss.next()
            while (last.state == start.state)
              last = ss.next()
            nextStart = Some(last)
            E(start.id, start.state, start.t, Some(last.t))
          } else {
            val Some(start) = nextStart
            nextStart = None
            E(start.id, start.state, start.t, None)
          }
        }
      }
    }
}

它看起来超级势在必行,所以不是超级高兴:(

相关内容

最新更新