根据最新记录设置新列值



>我有一个类似于下面的数据帧

+-------+-------+----------+
|dept_id|user_id|entry_date|
+-------+-------+----------+
|      3|      1|2020-06-03|
|      3|      2|2020-06-03|
|      3|      3|2020-06-03|
|      3|      4|2020-06-03|
|      3|      1|2020-06-04|
|      3|      1|2020-06-05|
+-------+-------+----------+

现在我需要添加一个新列,该列应指示用户的最新进入日期。 1 表示最新,0 表示旧

+-------+-------+----------+----------
|dept_id|user_id|entry_date|latest_rec
+-------+-------+----------+----------
|      3|      1|2020-06-03|0
|      3|      2|2020-06-03|1
|      3|      3|2020-06-03|1
|      3|      4|2020-06-03|1
|      3|      1|2020-06-04|0
|      3|      1|2020-06-05|1
+-------+-------+----------+---------

我尝试通过查找用户的排名来查找

val win = Window.partitionBy("dept_id", "user_id").orderBy(asc("entry_date"))
someDF.withColumn("rank_num",rank().over(win))

现在坚持如何根据latest_rec列填充rank_num列。我应该如何进行下一步?

我会使用row_number来查找最大日期,然后基于此推导出您的指标。

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("dept_id", "user_id").orderBy("entry_date")
val win = <your df>.withColumn("der_rank",row_number().over(windowSpec))
val final = win.withColumn("latest_rec",when("der_rank" === 1,1).otherwise(0))

不要使用 rank,而是在分区时获取last按 dept_id、user_id 和 orderBy entry_date,范围从当前行到无界以下行作为latest_entry_date。 然后将entry_date与latest_entry_date进行比较,并相应地设置latest_rec值。

scala> df.show+-------+-------+----------+
|dept_id|user_id|entry_date|
+-------+-------+----------+
|      3|      1|2020-06-03|
|      3|      2|2020-06-03|
|      3|      3|2020-06-03|
|      3|      4|2020-06-03|
|      3|      1|2020-06-04|
|      3|      1|2020-06-05|
+-------+-------+----------+

scala> val win = Window.partitionBy("dept_id","user_id").orderBy("entry_date").rowsBetween(Window.currentRow, Window.unboundedFollowing)
win: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@b3f21c2
scala> df.withColumn("latest_entry_date", last($"entry_date", true).over(win)).show+-------+-------+----------+-----------------+
|dept_id|user_id|entry_date|latest_entry_date|
+-------+-------+----------+-----------------+
|      3|      1|2020-06-03|       2020-06-05|
|      3|      1|2020-06-04|       2020-06-05|
|      3|      1|2020-06-05|       2020-06-05|
|      3|      3|2020-06-03|       2020-06-03|
|      3|      2|2020-06-03|       2020-06-03|
|      3|      4|2020-06-03|       2020-06-03|
+-------+-------+----------+-----------------+

scala> df.withColumn("latest_entry_date", last($"entry_date", true).over(win)).withColumn("latest_rec", when($"entry_date" === $"latest_entry_date", 1).otherwise(0)).show
+-------+-------+----------+-----------------+----------+
|dept_id|user_id|entry_date|latest_entry_date|latest_rec|
+-------+-------+----------+-----------------+----------+
|      3|      1|2020-06-03|       2020-06-05|         0|
|      3|      1|2020-06-04|       2020-06-05|         0|
|      3|      1|2020-06-05|       2020-06-05|         1|
|      3|      3|2020-06-03|       2020-06-03|         1|
|      3|      2|2020-06-03|       2020-06-03|         1|
|      3|      4|2020-06-03|       2020-06-03|         1|
+-------+-------+----------+-----------------+----------+

另一种替代方法:

加载提供的测试数据

val data =
"""
|dept_id|user_id|entry_date
|      3|      1|2020-06-03
|      3|      2|2020-06-03
|      3|      3|2020-06-03
|      3|      4|2020-06-03
|      3|      1|2020-06-04
|      3|      1|2020-06-05
""".stripMargin
val stringDS1 = data.split(System.lineSeparator())
.map(_.split("\|").map(_.replaceAll("""^[ t]+|[ t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
//      .option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df1.show(false)
df1.printSchema()
/**
* +-------+-------+----------+
* |dept_id|user_id|entry_date|
* +-------+-------+----------+
* |3      |1      |2020-06-03|
* |3      |2      |2020-06-03|
* |3      |3      |2020-06-03|
* |3      |4      |2020-06-03|
* |3      |1      |2020-06-04|
* |3      |1      |2020-06-05|
* +-------+-------+----------+
*
* root
* |-- dept_id: string (nullable = true)
* |-- user_id: string (nullable = true)
* |-- entry_date: string (nullable = true)
*/

使用 max(entry_date( over(按"dept_id"、"user_id"分区(

val w = Window.partitionBy("dept_id", "user_id")
val latestRec = when(datediff(max(to_date($"entry_date")).over(w), to_date($"entry_date")) =!= lit(0), 0)
.otherwise(1)
df1.withColumn("latest_rec", latestRec)
.orderBy("dept_id", "user_id", "entry_date")
.show(false)
/**
* +-------+-------+----------+----------+
* |dept_id|user_id|entry_date|latest_rec|
* +-------+-------+----------+----------+
* |3      |1      |2020-06-03|0         |
* |3      |1      |2020-06-04|0         |
* |3      |1      |2020-06-05|1         |
* |3      |2      |2020-06-03|1         |
* |3      |3      |2020-06-03|1         |
* |3      |4      |2020-06-03|1         |
* +-------+-------+----------+----------+
*/

最新更新