我有一个数据框架如下:
from pyspark.sql import functions as f
from pyspark.sql.window import Window
df = spark.createDataFrame([
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:30:57.000", "Username": "user1", "Region": "US"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:31:57.014", "Username": "user2", "Region": "US"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:32:57.914", "Username": "user1", "Region": "MX"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:35:57.914", "Username": "user2", "Region": "CA"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:33:57.914", "Username": "user1", "Region": "UK"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:34:57.914", "Username": "user1", "Region": "GR"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:36:57.914", "Username": "user2", "Region": "IR"}])
w = Window.partitionBy().orderBy("groupId","Username").orderBy("Username","ts")
df2 = df.withColumn("prev_region", f.lag(df.Region).over(w))
地区用户名tsuser1 2021-01-27 08:30:57.000 2021-01-27 08:32:57.914user1 2021-01-27 08:33:57.9142021-01-27 08:34:57.914 user2 2021-01-27 08:31:57.0142021-01-27 08:35:57.914红外 user2 2021-01-27 08:36:57.914
你就快成功了。
简单地基于您的DataFrame,通过指定windows函数,如下所示将工作
# Python API
>>> w = Window.partitionBy("Username").orderBy("groupId", "Username", "ts")
>>> df2.show(truncate=100)
+----------+------+--------+-------+-----------------------+-----------+
| Day|Region|Username|groupId| ts|prev_region|
+----------+------+--------+-------+-----------------------+-----------+
|2021-01-27| US| user1| A|2021-01-27 08:30:57.000| null|
|2021-01-27| MX| user1| A|2021-01-27 08:32:57.914| US|
|2021-01-27| UK| user1| A|2021-01-27 08:33:57.914| MX|
|2021-01-27| GR| user1| A|2021-01-27 08:34:57.914| UK|
|2021-01-27| US| user2| A|2021-01-27 08:31:57.014| null|
|2021-01-27| CA| user2| A|2021-01-27 08:35:57.914| US|
|2021-01-27| IR| user2| A|2021-01-27 08:36:57.914| CA|
+----------+------+--------+-------+-----------------------+-----------+
# SQL API
df.createOrReplaceTempView("df")
result = spark.sql("""
SELECT
Day, Region, Username, groupId, ts,
LAG(Region) OVER (PARTITION BY Username ORDER BY groupId, Username, ts) as rank
FROM df
""")
result.show(truncate=100)
+----------+------+--------+-------+-----------------------+----+
| Day|Region|Username|groupId| ts|rank|
+----------+------+--------+-------+-----------------------+----+
|2021-01-27| US| user1| A|2021-01-27 08:30:57.000|null|
|2021-01-27| MX| user1| A|2021-01-27 08:32:57.914| US|
|2021-01-27| UK| user1| A|2021-01-27 08:33:57.914| MX|
|2021-01-27| GR| user1| A|2021-01-27 08:34:57.914| UK|
|2021-01-27| US| user2| A|2021-01-27 08:31:57.014|null|
|2021-01-27| CA| user2| A|2021-01-27 08:35:57.914| US|
|2021-01-27| IR| user2| A|2021-01-27 08:36:57.914| CA|
+----------+------+--------+-------+-----------------------+----+
如果有多个组(多个groupId
),则将窗口函数声明如下:
>>> w = Window.partitionBy("groupId", "Username").orderBy("groupId", "ts", "Username")
您只需要在partitionBy
函数中添加Username
列。也不需要有两个orderBy
函数调用。将行改为:
w = Window.partitionBy('Username').orderBy("ts")