Spark - 聚合用户活动以表示时间段



我正在使用如下所示的Spark数据帧:

user | 1 | 2 | 3 | 4 | ... | 53
-------------------------------
1 | 1 | 0 | 0 | 1 | ... |  1
2 | 0 | 1 | 1 | 1 | ... |  0
3 | 1 | 1 | 0 | 0 | ... |  1
.
.
.
n | 1 | 0 | 1 | 1 | ... |  0

它包含表示用户 ID 的列,然后是一年中每周包含布尔值的列,表示用户在该周是否处于活动状态。

我的目标是将其简化为如下表:

user | active_start | active_end | duration
-------------------------------------------
1 |            1 |          1 |        1
1 |            4 |          4 |        1
1 |           53 |         53 |        1
2 |            2 |          4 |        3
3 |            1 |          2 |        2
3 |           53 |         53 |        1
.
.
.
n |            1 |          1 |        1
n |            3 |          4 |        2

其中包含连续活动的周期。

我对如何操作表/聚合值以便在检测到间隙时创建新行有些茫然。

我尝试使用岛/间隙检测代码来生成这些组,但无法实现不检测并为较大子岛中的较小子岛生成行的版本。

任何帮助将不胜感激, 谢谢!

这是另一个建议,也使用flatMap,但里面有foldLeft来计算间隔:

case class Interval(user: Int, active_start: Int, active_end: Int, duration: Int)
def computeIntervals(userId: Int, weeks: Seq[Int]): TraversableOnce[Interval] = {
// First, we get the indexes where the value is 1
val indexes: Seq[Int] = weeks.zipWithIndex.collect {
case (value, index) if value == 1 => index
}
// Then, we find the "breaks" in the sequence (i.e. when the difference between indexes is > 1)
val breaks: Seq[Int] = indexes.foldLeft((List[Int](), -1)) { (pair, currentValue) =>
val (breaksBuffer: List[Int], lastValue: Int) = pair
if ((currentValue - lastValue) > 1 && lastValue >= 0) (breaksBuffer :+ lastValue :+ currentValue, currentValue)
else (breaksBuffer, currentValue)
}._1
// Then, we add the first and last indexes and re-organize in pairs
val breakPairs = (indexes.head +: breaks :+ indexes.last).map(_ + 1).grouped(2)
// Finally, we convert each pair to an interval and return
breakPairs.map {
case List(lower, upper) => Interval(userId, lower, upper, upper-lower+1)
}
}

运行:

val df = Seq(
(1, 1, 0, 0, 1, 1),
(2, 0, 1, 1, 1, 0),
(3, 0, 0, 1, 0, 1),
(4, 1, 1, 0, 0, 1)
).toDF
import spark.implicits._
df.flatMap { row: Row => 
val (userId, weeksAsSeq) = ((row.toSeq.head.asInstanceOf[Int], row.toSeq.drop(1).map(_.asInstanceOf[Int])))
computeIntervals(userId, weeksAsSeq)
}.show
+----+------------+----------+--------+
|user|active_start|active_end|duration|
+----+------------+----------+--------+
|   1|           1|         1|       1|
|   1|           4|         5|       2|
|   2|           2|         4|       3|
|   3|           3|         3|       1|
|   3|           5|         5|       1|
|   4|           1|         2|       2|
|   4|           5|         5|       1|
+----+------------+----------+--------+

只需flatMap您的 df 即可计算每一行的指标。

然后向新的 DF 提供列名称。

val newDf = yourDf
.flatMap(row => {
val userId = row.getInt(0)
val arrayBuffer = ArrayBuffer[(Int, Int, Int, Int)]()
var start = -1
for (i <- 1 to 53) {
val active = row.getInt(i)
if (active == 1 && start == -1) {
start = i
}
else if (active == 0 && start != -1) {
val duration = i - start + 1
val end = i - 1
arrayBuffer.append((userId, start, end, duration))
start = -1
}
}
arrayBuffer
})
.toDF("user", "active_start", "active_end", "duration" )

最新更新