假设我有一个如下的数据帧:
simpleData = (("U1", "cd1dd155-ccd8-4b8c-bea7-571359e35fed", 1655605947),
("U1", "7f20182f-8c82-4c70-8213-f7889cfdd5eb", 1655777060),
("U1", "7f20182f-8c82-4c70-8213-f7889cfdd5eb", 1655777062),
("U1", "c4d5a218-d61d-4e9a-b1ea-646f676c4cb7", 1656209951),
("U1", "c4d5a218-d61d-4e9a-b1ea-646f676c4cb7", 1656209952),
("U1", "c4d5a218-d61d-4e9a-b1ea-646f676c4cb7", 1656209999),
)
columns= ["UID", "Sess", "Time"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)
+---+------------------------------------+----------+
|UID|Sess |Time |
+---+------------------------------------+----------+
|U1 |cd1dd155-ccd8-4b8c-bea7-571359e35fed|1655605947|
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777060|
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777062|
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209951|
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209952|
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209999|
+---+------------------------------------+----------+
我将窗口分区内的行列分配为
df2 = df.withColumn("sess_2", F.dense_rank().over(Window.orderBy('UID', 'Sess')))
df2.show(truncate=False)
我得到以下输出:
+---+------------------------------------+----------+------+
|UID|Sess |Time |sess_2|
+---+------------------------------------+----------+------+
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777060|1 |
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777062|1 |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209951|2 |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209952|2 |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209999|2 |
|U1 |cd1dd155-ccd8-4b8c-bea7-571359e35fed|1655605947|3 |
+---+------------------------------------+----------+------+
而我希望它是
+---+------------------------------------+----------+------+
|UID|Sess |Time |sess_2|
+---+------------------------------------+----------+------+
|U1 |cd1dd155-ccd8-4b8c-bea7-571359e35fed|1655605947|1 |
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777060|2 |
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777062|2 |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209951|3 |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209952|3 |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209999|3 |
+---+------------------------------------+----------+------+
如何为按时间排序的UID和sess的每个分区分配正确的sess_2?
要解决此问题,首先需要找到每个UID
和Sess
的会话开始时间,并在此基础上对Window进行排序。
from pyspark.sql import functions as F
from pyspark.sql import Window
simpleData = (("U1", "cd1dd155-ccd8-4b8c-bea7-571359e35fed", 1655605947),
("U1", "7f20182f-8c82-4c70-8213-f7889cfdd5eb", 1655777060),
("U1", "7f20182f-8c82-4c70-8213-f7889cfdd5eb", 1655777062),
("U1", "c4d5a218-d61d-4e9a-b1ea-646f676c4cb7", 1656209951),
("U1", "c4d5a218-d61d-4e9a-b1ea-646f676c4cb7", 1656209952),
("U1", "c4d5a218-d61d-4e9a-b1ea-646f676c4cb7", 1656209999),
)
columns= ["UID", "Sess", "Time"]
df = spark.createDataFrame(data = simpleData, schema = columns)
# per UID and Sess find when the session started and add Sess to the end to resolve duplicates when multiple sessions can start at the same time.
sess_ws = Window.partitionBy("UID", "Sess")
df_with_session_start = df.withColumn("session_start", F.concat(F.min("time").over(sess_ws), F.col("Sess")))
df2 = df_with_session_start.withColumn("sess_2", F.dense_rank().over(Window.partitionBy("UID").orderBy("session_start")))
df2.show(truncate=False)
"""
+---+------------------------------------+----------+------+
|UID|Sess |Time |sess_2|
+---+------------------------------------+----------+------+
|U1 |cd1dd155-ccd8-4b8c-bea7-571359e35fed|1655605947|1 |
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777060|2 |
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777062|2 |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209951|3 |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209952|3 |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209999|3 |
+---+------------------------------------+----------+------+
"""