i具有以下格式的输入数据集:
+---+--------+----------+
| id| refId| timestamp|
+---+--------+----------+
| 1| null|1548944642|
| 1|29950529|1548937685|
| 2|27510720|1548944885|
| 2|27510720|1548943617|
+---+--------+----------+
需要使用以下转换逻辑添加新列session
:
- 如果
refId is null
,会话值为真。 - 如果
id and refId are unique
,会话值为真。 - 如果
id and refId are not unique
和`时间戳大于上一行,则会值为真。同样,时间戳之间的区别应> 60。
+---+--------+-------+----------+
| id| refId|session| timestamp|
+---+--------+-------+----------+
| 1| null| true|1548944642|
| 1|29950529| true|1548937685|
| 2|27510720| false|1548943617|
| 2|27510720| true|1548944885|
+---+--------+-------+----------+
我可以做1&3个条件分别但不是第二个条件。
- `data.withcolumn(" session",functions.when(data.col(" refid")。3。
WindowSpec w = Window.partitionBy("id, refid").orderBy(timestampDS.col("timestamp"));
functions.coalesce(timestampDS.col("timestamp").cast("long").$minus(functions.lag("timestamp", 1).over(w).cast("long")), functions.lit(0));
我的问题是如何满足第二条状态并共同实施所有3个转换。
我会说使用Spark SQL以减少复杂性,并轻松实现
df.createOrReplaceTempView("test")
spark.sql("select id,refId,timestamp,case when refId is null and id is not null then 'true' when id is not null and refId is not null and rank=1 then 'true' else 'false' end as session from (select id,refId,timestamp, rank() OVER (PARTITION BY id,refId ORDER BY timestamp DESC) as rank from test) c").show()
输出看起来像这样:
+---+--------+----------+-------+
| id| refId| timestamp|session|
+---+--------+----------+-------+
| 1| null|1548944642| true|
| 1|29950529|1548937685| true|
| 2|27510720|1548944885| true|
| 2|27510720|1548943617| false|
+---+--------+----------+-------+
您可以将窗口函数用于groupby ID和rfid and rfid and timestamp订购,然后添加等级列。最后,您将Session列添加到While,否则SQL函数。
import org.apache.spark.sql.expressions.{Window}
import org.apache.spark.sql.functions.{when, col, rank, lit, lag}
val win = Window.partitionBy("id", "refId").orderBy("timestamp")
val result = df
.withColumn("previous", lag("timestamp", 1) over win)
.withColumn("rank", rank() over win)
.withColumn("session",
when(col("refId").isNull || col("rank") === lit(1), true)
.otherwise(false)
)
.withColumn("diff", col("timestamp") - col("previous"))