具有火花条件的滞后功能



我正在使用火花,我想计算假之前的真实列这是我的表:

条件是

如果d_difference> 900,则认为它是假的,否则为真

id          date1               date2            d_difference    status    
534     18/03/15 11:50:30  18/03/15 11:50:45         15          true
534     18/03/15 11:50:50  18/03/15 11:50:59         09          true
534     18/03/15 12:00:30  18/03/15 12:35:45         1815        false
534     18/03/15 12:00:50  18/03/15 12:36:45         2165        false
534     18/03/15 12:37:30  18/03/15 12:37:45         15          true
534     18/03/15 12:38:00  18/03/15 12:38:10         10          true
534     18/03/15 12:42:30  18/03/15 12:50:45         495         true
534     18/03/15 12:50:50  18/03/15 01:15:45         1505        false
534     18/03/15 12:50:30  18/03/15 12:50:55         20          true

val v2 = v1.withColumn("status",when($"d_difference" > 900,false).otherwise(true))
v2.withcolumn("lag_data",when ($"staus" === false ,lag("status",1).over(w)).otherwise(null)).show()

我正在使用火花滞后函数来计算以前的真实条件,该条件在 false 之前出现,但它失败了......

下面的解决方案只是我能想到的想法。我没有 IDE 来测试蚂蚁运行它。只是想顺便帮忙就可以想到。

假设您的列状态具有真值和假值。我们必须在第一个假之前计算真数。我能想到的RDD的一种方法是。

获取 rdd 包含状态。让 day rddOfStatus.

rddOfStats.zipWithIndex().filter(condition).first
  1. zipWithIndex 将索引每个状态,例如"真",0"真",1"假",2
  2. 过滤它并获取第一个元素 (TPL(,即

"假",2

  1. 从这个元组中,我们可以提取值并获取计数。

或者也可以用Dataframe或Spark SQl做同样的事情。

  1. 添加一个带有行号的列,为每一行分配数字,

  2. 然后过滤所有真,只保留假

  3. 然后获得最低row_number

我可以使用 2 个时间窗口函数来计算所有假数

val w = Window.partitionBy("id").orderBy("date1","date2")
 val w1 = Window.partitionBy("id","status").orderBy("date1","date2")
 val r1 = ($"status" !== lag($"status", 1).over(w) && $"status").cast("bigint")
 v2.withColumn("new_session",r1)
 val t1 = v2.withColumn("session",sum(r1).over(w1)).show()

我得到假值的总数,它先于真.

如果您希望在False之前累积True总和,那么您可以参考 下面的代码:

    scala> import org.apache.spark.sql.expressions.Window
    scala> val w = Window.partitionBy("id").orderBy("id")
    scala> val w1 = Window.partitionBy("id").orderBy("rn")
 //Input data frame
    scala> df.show()
    +---+-----------------+-----------------+------------+------+
    | id|            date1|            date2|d_difference|status|
    +---+-----------------+-----------------+------------+------+
    |534|18/03/15 11:50:30|18/03/15 11:50:45|          15|  true|
    |534|18/03/15 11:50:50|18/03/15 11:50:59|          09| false|
    |534|18/03/15 12:00:30|18/03/15 12:35:45|        1815| false|
    |534|18/03/15 12:00:50|18/03/15 12:36:45|        2165| false|
    |534|18/03/15 12:37:30|18/03/15 12:37:45|          15|  true|
    |534|18/03/15 12:38:00|18/03/15 12:38:10|          10| false|
    |534|18/03/15 12:42:30|18/03/15 12:50:45|         495|  true|
    |534|18/03/15 12:50:50|18/03/15 01:15:45|        1505| false|
    |534|18/03/15 12:50:30|18/03/15 12:50:55|          20|  true|
    +---+-----------------+-----------------+------------+------+
    scala> val df1 = df.withColumn("rn", row_number over(w))
    scala> val df2 = df1.filter(col("status") === "false").withColumn("prv_rn",  lag("rn" ,1,0) over (w))
    scala> val df3 = df2.withColumn("sum", (col("rn") - col("prv_rn") - 1)).withColumn("true_count", sum(col("sum")) over(w1)).select("id","date1","date2","status","true_count")
  //Join final output
    scala> df.join(df3, Seq("id","date1","date2","status"),"left").show()
    +---+-----------------+-----------------+------+------------+----------+
    | id|            date1|            date2|status|d_difference|true_count|
    +---+-----------------+-----------------+------+------------+----------+
    |534|18/03/15 11:50:30|18/03/15 11:50:45|  true|          15|      null|
    |534|18/03/15 11:50:50|18/03/15 11:50:59| false|          09|         1|
    |534|18/03/15 12:00:30|18/03/15 12:35:45| false|        1815|         1|
    |534|18/03/15 12:00:50|18/03/15 12:36:45| false|        2165|         1|
    |534|18/03/15 12:37:30|18/03/15 12:37:45|  true|          15|      null|
    |534|18/03/15 12:38:00|18/03/15 12:38:10| false|          10|         2|
    |534|18/03/15 12:42:30|18/03/15 12:50:45|  true|         495|      null|
    |534|18/03/15 12:50:50|18/03/15 01:15:45| false|        1505|         4|
    |534|18/03/15 12:50:30|18/03/15 12:50:55|  true|          20|      null|
    +---+-----------------+-----------------+------+------------+----------+

最新更新