分组数据的滞后函数



我有一个数据框架如下:

from pyspark.sql import functions as f
from pyspark.sql.window import Window
df = spark.createDataFrame([
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:30:57.000", "Username": "user1", "Region": "US"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:31:57.014", "Username": "user2", "Region": "US"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:32:57.914", "Username": "user1", "Region": "MX"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:35:57.914", "Username": "user2", "Region": "CA"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:33:57.914", "Username": "user1", "Region": "UK"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:34:57.914", "Username": "user1", "Region": "GR"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:36:57.914", "Username": "user2", "Region": "IR"}])
w = Window.partitionBy().orderBy("groupId","Username").orderBy("Username","ts")
df2 = df.withColumn("prev_region", f.lag(df.Region).over(w))
地区用户名ts2021-01-27 08:30:57.0002021-01-27 08:32:57.9142021-01-27 08:33:57.9142021-01-27 08:34:57.9142021-01-27 08:31:57.0142021-01-27 08:35:57.914红外user22021-01-27 08:36:57.914

你就快成功了。

简单地基于您的DataFrame,通过指定windows函数,如下所示将工作

# Python API
>>> w = Window.partitionBy("Username").orderBy("groupId", "Username", "ts")
>>> df2.show(truncate=100)
+----------+------+--------+-------+-----------------------+-----------+
|       Day|Region|Username|groupId|                     ts|prev_region|
+----------+------+--------+-------+-----------------------+-----------+
|2021-01-27|    US|   user1|      A|2021-01-27 08:30:57.000|       null|
|2021-01-27|    MX|   user1|      A|2021-01-27 08:32:57.914|         US|
|2021-01-27|    UK|   user1|      A|2021-01-27 08:33:57.914|         MX|
|2021-01-27|    GR|   user1|      A|2021-01-27 08:34:57.914|         UK|
|2021-01-27|    US|   user2|      A|2021-01-27 08:31:57.014|       null|
|2021-01-27|    CA|   user2|      A|2021-01-27 08:35:57.914|         US|
|2021-01-27|    IR|   user2|      A|2021-01-27 08:36:57.914|         CA|
+----------+------+--------+-------+-----------------------+-----------+
# SQL API
df.createOrReplaceTempView("df")
result = spark.sql("""
SELECT 
Day, Region, Username, groupId, ts, 
LAG(Region) OVER (PARTITION BY Username ORDER BY groupId, Username, ts) as rank
FROM df 
""")
result.show(truncate=100)
+----------+------+--------+-------+-----------------------+----+
|       Day|Region|Username|groupId|                     ts|rank|
+----------+------+--------+-------+-----------------------+----+
|2021-01-27|    US|   user1|      A|2021-01-27 08:30:57.000|null|
|2021-01-27|    MX|   user1|      A|2021-01-27 08:32:57.914|  US|
|2021-01-27|    UK|   user1|      A|2021-01-27 08:33:57.914|  MX|
|2021-01-27|    GR|   user1|      A|2021-01-27 08:34:57.914|  UK|
|2021-01-27|    US|   user2|      A|2021-01-27 08:31:57.014|null|
|2021-01-27|    CA|   user2|      A|2021-01-27 08:35:57.914|  US|
|2021-01-27|    IR|   user2|      A|2021-01-27 08:36:57.914|  CA|
+----------+------+--------+-------+-----------------------+----+

如果有多个组(多个groupId),则将窗口函数声明如下:

>>> w = Window.partitionBy("groupId", "Username").orderBy("groupId", "ts", "Username")

您只需要在partitionBy函数中添加Username列。也不需要有两个orderBy函数调用。将行改为:

w = Window.partitionBy('Username').orderBy("ts")

最新更新