给定一个 S 数据集,
目标是生成一个 E 数据集,其中 S 和 E 定义如下:
// event where start (s) is inclusive, end (e) is exclusive
case class E(id: Int, state: String, s: Int, e: Option[Int])
//snapshot with state at t for an id
case class S(id: Int, state: String, time: Int)
//Example test case
val ss: Dataset[S] = Seq(S(100, "a", 1), S(100, "b", 2), S(100, "b", 3), S(100, "a", 4), S(100, "a", 5), S(100, "a", 6), S(100, "c", 9))
.toDS
val es: Dataset[E] = ss
.toEs
es.collect() must contain theSameElementsAs
Seq(E(100, "a", 1, Some(2)), E(100, "b", 2, Some(4)), E(100, "a", 4, Some(9)), E(100, "c", 9, None))
一个状态可以有多个快照(在不同时间),但输出应累积有效的开始和结束时间。此外,最后一个活动状态应该在输出中没有结束日期(选项)。
上述toEs
定义如下:
implicit class SOps(ss: Dataset[S]) {
def toEs(implicit spark: SparkSession): Dataset[E] = ???
}
下图描述了期望的转换
以下是使用 flatMapGroups
的解决方案,如果组太大而无法放入内存,该解决方案将溢出到磁盘
def toEs(implicit spark: SparkSession): Dataset[E] = {
import spark.implicits._
ss
.sort(ss("id"), ss("t"))
.groupByKey(s => s.id)
.flatMapGroups { (_, ss) =>
new Iterator[E] {
var nextStart: Option[S] = None
override def hasNext: Boolean = ss.hasNext || nextStart.isDefined
override def next(): E = {
if (ss.hasNext) {
val start = nextStart.getOrElse(ss.next())
var last = ss.next()
while (last.state == start.state)
last = ss.next()
nextStart = Some(last)
E(start.id, start.state, start.t, Some(last.t))
} else {
val Some(start) = nextStart
nextStart = None
E(start.id, start.state, start.t, None)
}
}
}
}
}
它看起来超级势在必行,所以不是超级高兴:(