如何在pyspark中按时间分配数据分区的顺序



假设我有一个如下的数据帧:

simpleData = (("U1", "cd1dd155-ccd8-4b8c-bea7-571359e35fed", 1655605947), 
("U1", "7f20182f-8c82-4c70-8213-f7889cfdd5eb", 1655777060),  
("U1", "7f20182f-8c82-4c70-8213-f7889cfdd5eb", 1655777062),
("U1", "c4d5a218-d61d-4e9a-b1ea-646f676c4cb7", 1656209951),   
("U1", "c4d5a218-d61d-4e9a-b1ea-646f676c4cb7", 1656209952),  
("U1", "c4d5a218-d61d-4e9a-b1ea-646f676c4cb7", 1656209999),    
)

columns= ["UID", "Sess", "Time"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)
+---+------------------------------------+----------+
|UID|Sess                                |Time      |
+---+------------------------------------+----------+
|U1 |cd1dd155-ccd8-4b8c-bea7-571359e35fed|1655605947|
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777060|
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777062|
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209951|
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209952|
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209999|
+---+------------------------------------+----------+

我将窗口分区内的行列分配为

df2 = df.withColumn("sess_2", F.dense_rank().over(Window.orderBy('UID', 'Sess')))
df2.show(truncate=False)

我得到以下输出:

+---+------------------------------------+----------+------+
|UID|Sess                                |Time      |sess_2|
+---+------------------------------------+----------+------+
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777060|1     |
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777062|1     |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209951|2     |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209952|2     |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209999|2     |
|U1 |cd1dd155-ccd8-4b8c-bea7-571359e35fed|1655605947|3     |
+---+------------------------------------+----------+------+

而我希望它是

+---+------------------------------------+----------+------+
|UID|Sess                                |Time      |sess_2|
+---+------------------------------------+----------+------+
|U1 |cd1dd155-ccd8-4b8c-bea7-571359e35fed|1655605947|1     |
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777060|2     |
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777062|2     |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209951|3     |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209952|3     |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209999|3     |
+---+------------------------------------+----------+------+

如何为按时间排序的UIDsess的每个分区分配正确的sess_2

要解决此问题,首先需要找到每个UIDSess的会话开始时间,并在此基础上对Window进行排序。

from pyspark.sql import functions as F
from pyspark.sql import Window
simpleData = (("U1", "cd1dd155-ccd8-4b8c-bea7-571359e35fed", 1655605947), 
("U1", "7f20182f-8c82-4c70-8213-f7889cfdd5eb", 1655777060),  
("U1", "7f20182f-8c82-4c70-8213-f7889cfdd5eb", 1655777062),
("U1", "c4d5a218-d61d-4e9a-b1ea-646f676c4cb7", 1656209951),   
("U1", "c4d5a218-d61d-4e9a-b1ea-646f676c4cb7", 1656209952),  
("U1", "c4d5a218-d61d-4e9a-b1ea-646f676c4cb7", 1656209999),    
)

columns= ["UID", "Sess", "Time"]
df = spark.createDataFrame(data = simpleData, schema = columns)
# per UID and Sess find when the session started and add Sess to the end to resolve duplicates when multiple sessions can start at the same time.
sess_ws = Window.partitionBy("UID", "Sess")
df_with_session_start = df.withColumn("session_start", F.concat(F.min("time").over(sess_ws), F.col("Sess")))
df2 = df_with_session_start.withColumn("sess_2", F.dense_rank().over(Window.partitionBy("UID").orderBy("session_start")))
df2.show(truncate=False)
"""
+---+------------------------------------+----------+------+
|UID|Sess                                |Time      |sess_2|
+---+------------------------------------+----------+------+
|U1 |cd1dd155-ccd8-4b8c-bea7-571359e35fed|1655605947|1     |
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777060|2     |
|U1 |7f20182f-8c82-4c70-8213-f7889cfdd5eb|1655777062|2     |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209951|3     |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209952|3     |
|U1 |c4d5a218-d61d-4e9a-b1ea-646f676c4cb7|1656209999|3     |
+---+------------------------------------+----------+------+
"""

最新更新