>我有一个类似于下面的数据帧
+-------+-------+----------+
|dept_id|user_id|entry_date|
+-------+-------+----------+
| 3| 1|2020-06-03|
| 3| 2|2020-06-03|
| 3| 3|2020-06-03|
| 3| 4|2020-06-03|
| 3| 1|2020-06-04|
| 3| 1|2020-06-05|
+-------+-------+----------+
现在我需要添加一个新列,该列应指示用户的最新进入日期。 1 表示最新,0 表示旧
+-------+-------+----------+----------
|dept_id|user_id|entry_date|latest_rec
+-------+-------+----------+----------
| 3| 1|2020-06-03|0
| 3| 2|2020-06-03|1
| 3| 3|2020-06-03|1
| 3| 4|2020-06-03|1
| 3| 1|2020-06-04|0
| 3| 1|2020-06-05|1
+-------+-------+----------+---------
我尝试通过查找用户的排名来查找
val win = Window.partitionBy("dept_id", "user_id").orderBy(asc("entry_date"))
someDF.withColumn("rank_num",rank().over(win))
现在坚持如何根据latest_rec列填充rank_num列。我应该如何进行下一步?
我会使用row_number来查找最大日期,然后基于此推导出您的指标。
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("dept_id", "user_id").orderBy("entry_date")
val win = <your df>.withColumn("der_rank",row_number().over(windowSpec))
val final = win.withColumn("latest_rec",when("der_rank" === 1,1).otherwise(0))
不要使用 rank,而是在分区时获取last
按 dept_id、user_id 和 orderBy entry_date,范围从当前行到无界以下行作为latest_entry_date。 然后将entry_date与latest_entry_date进行比较,并相应地设置latest_rec值。
scala> df.show+-------+-------+----------+
|dept_id|user_id|entry_date|
+-------+-------+----------+
| 3| 1|2020-06-03|
| 3| 2|2020-06-03|
| 3| 3|2020-06-03|
| 3| 4|2020-06-03|
| 3| 1|2020-06-04|
| 3| 1|2020-06-05|
+-------+-------+----------+
scala> val win = Window.partitionBy("dept_id","user_id").orderBy("entry_date").rowsBetween(Window.currentRow, Window.unboundedFollowing)
win: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@b3f21c2
scala> df.withColumn("latest_entry_date", last($"entry_date", true).over(win)).show+-------+-------+----------+-----------------+
|dept_id|user_id|entry_date|latest_entry_date|
+-------+-------+----------+-----------------+
| 3| 1|2020-06-03| 2020-06-05|
| 3| 1|2020-06-04| 2020-06-05|
| 3| 1|2020-06-05| 2020-06-05|
| 3| 3|2020-06-03| 2020-06-03|
| 3| 2|2020-06-03| 2020-06-03|
| 3| 4|2020-06-03| 2020-06-03|
+-------+-------+----------+-----------------+
scala> df.withColumn("latest_entry_date", last($"entry_date", true).over(win)).withColumn("latest_rec", when($"entry_date" === $"latest_entry_date", 1).otherwise(0)).show
+-------+-------+----------+-----------------+----------+
|dept_id|user_id|entry_date|latest_entry_date|latest_rec|
+-------+-------+----------+-----------------+----------+
| 3| 1|2020-06-03| 2020-06-05| 0|
| 3| 1|2020-06-04| 2020-06-05| 0|
| 3| 1|2020-06-05| 2020-06-05| 1|
| 3| 3|2020-06-03| 2020-06-03| 1|
| 3| 2|2020-06-03| 2020-06-03| 1|
| 3| 4|2020-06-03| 2020-06-03| 1|
+-------+-------+----------+-----------------+----------+
另一种替代方法:
加载提供的测试数据
val data =
"""
|dept_id|user_id|entry_date
| 3| 1|2020-06-03
| 3| 2|2020-06-03
| 3| 3|2020-06-03
| 3| 4|2020-06-03
| 3| 1|2020-06-04
| 3| 1|2020-06-05
""".stripMargin
val stringDS1 = data.split(System.lineSeparator())
.map(_.split("\|").map(_.replaceAll("""^[ t]+|[ t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
// .option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df1.show(false)
df1.printSchema()
/**
* +-------+-------+----------+
* |dept_id|user_id|entry_date|
* +-------+-------+----------+
* |3 |1 |2020-06-03|
* |3 |2 |2020-06-03|
* |3 |3 |2020-06-03|
* |3 |4 |2020-06-03|
* |3 |1 |2020-06-04|
* |3 |1 |2020-06-05|
* +-------+-------+----------+
*
* root
* |-- dept_id: string (nullable = true)
* |-- user_id: string (nullable = true)
* |-- entry_date: string (nullable = true)
*/
使用 max(entry_date( over(按"dept_id"、"user_id"分区(
val w = Window.partitionBy("dept_id", "user_id")
val latestRec = when(datediff(max(to_date($"entry_date")).over(w), to_date($"entry_date")) =!= lit(0), 0)
.otherwise(1)
df1.withColumn("latest_rec", latestRec)
.orderBy("dept_id", "user_id", "entry_date")
.show(false)
/**
* +-------+-------+----------+----------+
* |dept_id|user_id|entry_date|latest_rec|
* +-------+-------+----------+----------+
* |3 |1 |2020-06-03|0 |
* |3 |1 |2020-06-04|0 |
* |3 |1 |2020-06-05|1 |
* |3 |2 |2020-06-03|1 |
* |3 |3 |2020-06-03|1 |
* |3 |4 |2020-06-03|1 |
* +-------+-------+----------+----------+
*/