火花:带有数据框的复杂操作



i具有以下格式的输入数据集:

+---+--------+----------+
| id|   refId| timestamp|
+---+--------+----------+
|  1|    null|1548944642|
|  1|29950529|1548937685|
|  2|27510720|1548944885|
|  2|27510720|1548943617|
+---+--------+----------+

需要使用以下转换逻辑添加新列session

  1. 如果refId is null,会话值为真。
  2. 如果id and refId are unique,会话值为真。
  3. 如果 id and refId are not unique和`时间戳大于上一行,则会值为真。同样,时间戳之间的区别应> 60。
+---+--------+-------+----------+
| id|   refId|session| timestamp|
+---+--------+-------+----------+
|  1|    null|   true|1548944642|
|  1|29950529|   true|1548937685|
|  2|27510720|  false|1548943617|
|  2|27510720|   true|1548944885|
+---+--------+-------+----------+

我可以做1&3个条件分别但不是第二个条件。

  1. `data.withcolumn(" session",functions.when(data.col(" refid")。3。
WindowSpec w = Window.partitionBy("id, refid").orderBy(timestampDS.col("timestamp"));
functions.coalesce(timestampDS.col("timestamp").cast("long").$minus(functions.lag("timestamp", 1).over(w).cast("long")), functions.lit(0));

我的问题是如何满足第二条状态并共同实施所有3个转换。

我会说使用Spark SQL以减少复杂性,并轻松实现

df.createOrReplaceTempView("test")
spark.sql("select id,refId,timestamp,case when refId is null and id is not null then 'true' when id is not null and refId is not null and rank=1 then 'true' else 'false' end as session from  (select id,refId,timestamp, rank() OVER (PARTITION BY id,refId ORDER BY timestamp DESC) as rank from test) c").show()

输出看起来像这样:

+---+--------+----------+-------+
| id|   refId| timestamp|session|
+---+--------+----------+-------+
|  1|    null|1548944642|   true|
|  1|29950529|1548937685|   true|
|  2|27510720|1548944885|   true|
|  2|27510720|1548943617|  false|
+---+--------+----------+-------+ 

您可以将窗口函数用于groupby ID和rfid and rfid and timestamp订购,然后添加等级列。最后,您将Session列添加到While,否则SQL函数。

import org.apache.spark.sql.expressions.{Window}
import org.apache.spark.sql.functions.{when, col, rank, lit, lag}
val win = Window.partitionBy("id", "refId").orderBy("timestamp")
val result = df
      .withColumn("previous", lag("timestamp", 1) over win)
      .withColumn("rank", rank() over win)
      .withColumn("session",
        when(col("refId").isNull || col("rank") === lit(1), true)
          .otherwise(false)
      )
      .withColumn("diff", col("timestamp") - col("previous"))

相关内容

  • 没有找到相关文章

最新更新