数据帧获取相应列的第一个和最后一个值



是否可以在子组中获取相应列的第一个值。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.{Window, WindowSpec}
object tmp {
  def main(args: Array[String]): Unit = {
    val spark =  SparkSession.builder().master("local").getOrCreate()
    import spark.implicits._
    val input = Seq(
      (1235,  1, 1101, 0),
      (1235,  2, 1102, 0),
      (1235,  3, 1103, 1),
      (1235,  4, 1104, 1),
      (1235,  5, 1105, 0),
      (1235,  6, 1106, 0),
      (1235,  7, 1107, 1),
      (1235,  8, 1108, 1),
      (1235,  9, 1109, 1),
      (1235, 10, 1110, 0),
      (1235, 11, 1111, 0)
    ).toDF("SERVICE_ID", "COUNTER", "EVENT_ID", "FLAG")
    lazy val window: WindowSpec = Window.partitionBy("SERVICE_ID").orderBy("COUNTER")
    val firsts = input.withColumn("first_value", first("EVENT_ID", ignoreNulls = true).over(window.rangeBetween(Long.MinValue, Long.MaxValue)))
    firsts.orderBy("SERVICE_ID", "COUNTER").show()
  }
}

我想要的输出。

基于 FLAG = 1 的列EVENT_ID的第一个(或上一个)值和基于 FLAG = 1 的列EVENT_ID的最后一个(或下一个)值分区依据SERVICE_ID按计数器排序

+----------+-------+--------+----+-----------+-----------+
|SERVICE_ID|COUNTER|EVENT_ID|FLAG|first_value|last_value|
+----------+-------+--------+----+-----------+-----------+
|      1235|      1|    1101|   0|          0|       1103|
|      1235|      2|    1102|   0|          0|       1103|
|      1235|      3|    1103|   1|          0|       1106|
|      1235|      4|    1104|   0|       1103|       1106|
|      1235|      5|    1105|   0|       1103|       1106|
|      1235|      6|    1106|   1|          0|       1108|
|      1235|      7|    1107|   0|       1106|       1108|
|      1235|      8|    1108|   1|          0|       1109|
|      1235|      9|    1109|   1|          0|       1110|
|      1235|     10|    1110|   1|          0|          0|
|      1235|     11|    1111|   0|       1110|          0|
|      1235|     12|    1112|   0|       1110|          0|
+----------+-------+--------+----+-----------+-----------+

首先,需要将数据帧组成组。每次"TIME"列等于 1 时,都会开始一个新组。为此,请首先向数据帧添加列"ID":

lazy val window: WindowSpec = Window.partitionBy("SERVICE_ID").orderBy("COUNTER")
val df_flag = input.filter($"FLAG" === 1)
  .withColumn("ID", row_number().over(window))
val df_other = input.filter($"FLAG" =!= 1)
  .withColumn("ID", lit(0))
// Create a group for each flag event
val df = df_flag.union(df_other)
  .withColumn("ID", max("ID").over(window.rowsBetween(Long.MinValue, 0)))
  .cache()

df.show()给出:

+----------+-------+--------+----+---+
|SERVICE_ID|COUNTER|EVENT_ID|FLAG| ID|
+----------+-------+--------+----+---+
|      1235|      1|    1111|   1|  1|
|      1235|      2|    1112|   0|  1|
|      1235|      3|    1114|   0|  1|
|      1235|      4|    2221|   1|  2|
|      1235|      5|    2225|   0|  2|
|      1235|      6|    2226|   0|  2|
|      1235|      7|    2227|   1|  3|
+----------+-------+--------+----+---+

现在我们有一列分隔事件,我们需要为每个事件添加正确的"EVENT_ID"(重命名为"first_value")。除了"first_value"之外,计算并添加第二列"last_value",这是下一个标记事件的 id。

val df_event = df.filter($"FLAG" === 1)
  .select("EVENT_ID", "ID", "SERVICE_ID", "COUNTER")
  .withColumnRenamed("EVENT_ID", "first_value")
  .withColumn("last_value", lead($"first_value",1,0).over(window))
  .drop("COUNTER")
val df_final = df.join(df_event, Seq("ID", "SERVICE_ID"))
  .drop("ID")
  .withColumn("first_value", when($"FLAG" === 1, lit(0)).otherwise($"first_value"))

df_final.show()给了我们:

+----------+-------+--------+----+-----------+----------+
|SERVICE_ID|COUNTER|EVENT_ID|FLAG|first_value|last_value|
+----------+-------+--------+----+-----------+----------+
|      1235|      1|    1111|   1|          0|      2221|
|      1235|      2|    1112|   0|       1111|      2221|
|      1235|      3|    1114|   0|       1111|      2221|
|      1235|      4|    2221|   1|          0|      2227|
|      1235|      5|    2225|   0|       2221|      2227|
|      1235|      6|    2226|   0|       2221|      2227|
|      1235|      7|    2227|   1|          0|         0|
+----------+-------+--------+----+-----------+----------+

分两步解决:

  1. 获取带有"FLAG"== 1 的事件,并且此事件的有效范围;
  2. 连接 1. 输入,按范围。

为了可见性,包括一些列重命名,可以缩短:

val window = Window.partitionBy("SERVICE_ID").orderBy("COUNTER").rowsBetween(Window.currentRow, 1)
val eventRangeDF = input.where($"FLAG" === 1)
  .withColumn("RANGE_END", max($"COUNTER").over(window))
  .withColumnRenamed("COUNTER", "RANGE_START")
  .select("SERVICE_ID", "EVENT_ID", "RANGE_START", "RANGE_END")
eventRangeDF.show(false)
val result = input.where($"FLAG" === 0).as("i").join(eventRangeDF.as("e"),
  expr("e.SERVICE_ID=i.SERVICE_ID And i.COUNTER>e.RANGE_START and i.COUNTER<e.RANGE_END"))
  .select($"i.SERVICE_ID", $"i.COUNTER", $"i.EVENT_ID", $"i.FLAG", $"e.EVENT_ID".alias("first_value"))
  // include FLAG=1
  .union(input.where($"FLAG" === 1).select($"SERVICE_ID", $"COUNTER", $"EVENT_ID", $"FLAG", lit(0).alias("first_value")))
result.sort("COUNTER").show(false)

输出:

+----------+--------+-----------+---------+
|SERVICE_ID|EVENT_ID|RANGE_START|RANGE_END|
+----------+--------+-----------+---------+
|1235      |1111    |1          |4        |
|1235      |2221    |4          |7        |
|1235      |2227    |7          |7        |
+----------+--------+-----------+---------+
+----------+-------+--------+----+-----------+
|SERVICE_ID|COUNTER|EVENT_ID|FLAG|first_value|
+----------+-------+--------+----+-----------+
|1235      |1      |1111    |1   |0          |
|1235      |2      |1112    |0   |1111       |
|1235      |3      |1114    |0   |1111       |
|1235      |4      |2221    |1   |0          |
|1235      |5      |2225    |0   |2221       |
|1235      |6      |2226    |0   |2221       |
|1235      |7      |2227    |1   |0          |
+----------+-------+--------+----+-----------+

相关内容

  • 没有找到相关文章

最新更新