我正在使用火花,我想计算假之前的真实列这是我的表:
条件是
如果d_difference> 900,则认为它是假的,否则为真
id date1 date2 d_difference status
534 18/03/15 11:50:30 18/03/15 11:50:45 15 true
534 18/03/15 11:50:50 18/03/15 11:50:59 09 true
534 18/03/15 12:00:30 18/03/15 12:35:45 1815 false
534 18/03/15 12:00:50 18/03/15 12:36:45 2165 false
534 18/03/15 12:37:30 18/03/15 12:37:45 15 true
534 18/03/15 12:38:00 18/03/15 12:38:10 10 true
534 18/03/15 12:42:30 18/03/15 12:50:45 495 true
534 18/03/15 12:50:50 18/03/15 01:15:45 1505 false
534 18/03/15 12:50:30 18/03/15 12:50:55 20 true
val v2 = v1.withColumn("status",when($"d_difference" > 900,false).otherwise(true))
v2.withcolumn("lag_data",when ($"staus" === false ,lag("status",1).over(w)).otherwise(null)).show()
我正在使用火花滞后函数来计算以前的真实条件,该条件在 false 之前出现,但它失败了......
下面的解决方案只是我能想到的想法。我没有 IDE 来测试蚂蚁运行它。只是想顺便帮忙就可以想到。
假设您的列状态具有真值和假值。我们必须在第一个假之前计算真数。我能想到的RDD的一种方法是。
获取 rdd 包含状态。让 day rddOfStatus.
rddOfStats.zipWithIndex().filter(condition).first
- zipWithIndex 将索引每个状态,例如"真",0"真",1"假",2
- 过滤它并获取第一个元素 (TPL(,即
"假",2
- 从这个元组中,我们可以提取值并获取计数。
或者也可以用Dataframe或Spark SQl做同样的事情。
添加一个带有行号的列,为每一行分配数字,
然后过滤所有真,只保留假
然后获得最低row_number
我可以使用 2 个时间窗口函数来计算所有假数
val w = Window.partitionBy("id").orderBy("date1","date2")
val w1 = Window.partitionBy("id","status").orderBy("date1","date2")
val r1 = ($"status" !== lag($"status", 1).over(w) && $"status").cast("bigint")
v2.withColumn("new_session",r1)
val t1 = v2.withColumn("session",sum(r1).over(w1)).show()
我得到假值的总数,它先于真.
如果您希望在
False
之前累积True
总和,那么您可以参考 下面的代码:
scala> import org.apache.spark.sql.expressions.Window
scala> val w = Window.partitionBy("id").orderBy("id")
scala> val w1 = Window.partitionBy("id").orderBy("rn")
//Input data frame
scala> df.show()
+---+-----------------+-----------------+------------+------+
| id| date1| date2|d_difference|status|
+---+-----------------+-----------------+------------+------+
|534|18/03/15 11:50:30|18/03/15 11:50:45| 15| true|
|534|18/03/15 11:50:50|18/03/15 11:50:59| 09| false|
|534|18/03/15 12:00:30|18/03/15 12:35:45| 1815| false|
|534|18/03/15 12:00:50|18/03/15 12:36:45| 2165| false|
|534|18/03/15 12:37:30|18/03/15 12:37:45| 15| true|
|534|18/03/15 12:38:00|18/03/15 12:38:10| 10| false|
|534|18/03/15 12:42:30|18/03/15 12:50:45| 495| true|
|534|18/03/15 12:50:50|18/03/15 01:15:45| 1505| false|
|534|18/03/15 12:50:30|18/03/15 12:50:55| 20| true|
+---+-----------------+-----------------+------------+------+
scala> val df1 = df.withColumn("rn", row_number over(w))
scala> val df2 = df1.filter(col("status") === "false").withColumn("prv_rn", lag("rn" ,1,0) over (w))
scala> val df3 = df2.withColumn("sum", (col("rn") - col("prv_rn") - 1)).withColumn("true_count", sum(col("sum")) over(w1)).select("id","date1","date2","status","true_count")
//Join final output
scala> df.join(df3, Seq("id","date1","date2","status"),"left").show()
+---+-----------------+-----------------+------+------------+----------+
| id| date1| date2|status|d_difference|true_count|
+---+-----------------+-----------------+------+------------+----------+
|534|18/03/15 11:50:30|18/03/15 11:50:45| true| 15| null|
|534|18/03/15 11:50:50|18/03/15 11:50:59| false| 09| 1|
|534|18/03/15 12:00:30|18/03/15 12:35:45| false| 1815| 1|
|534|18/03/15 12:00:50|18/03/15 12:36:45| false| 2165| 1|
|534|18/03/15 12:37:30|18/03/15 12:37:45| true| 15| null|
|534|18/03/15 12:38:00|18/03/15 12:38:10| false| 10| 2|
|534|18/03/15 12:42:30|18/03/15 12:50:45| true| 495| null|
|534|18/03/15 12:50:50|18/03/15 01:15:45| false| 1505| 4|
|534|18/03/15 12:50:30|18/03/15 12:50:55| true| 20| null|
+---+-----------------+-----------------+------+------------+----------+