Pyspark中基于时间的窗口函数



我的目标是计算另一列,保持与原始DataFrame相同的行数,在那里我可以显示每个用户过去30天的平均余额。

我想这可以使用Window函数来完成,按用户进行分区,并以某种方式限制当前日期和30天前的行,但我不知道如何在PySpark中实现它。

我有以下Spark DataFrame:

余额100600100200600<1600>
userId 日期
A 2020年6月9日
A 2020年7月3日 200
A 2020年8月5日
A 2020年8月30日 1000
A 2020年9月15日 500
B 2020年1月3日
B 2020年4月5日
B 2020年4月29日
B 2020年5月1日

您可以使用RANGE BETWEEN关键字:

sdf_prueba.createOrReplaceTempView("table1")
spark.sql(
"""SELECT *, mean(balance) OVER (
PARTITION BY userid 
ORDER BY CAST(date AS timestamp)  
RANGE BETWEEN INTERVAL 30 DAYS PRECEDING AND CURRENT ROW
) AS mean FROM table1""").show()

+------+----------+-------+-----+
|userId|      date|balance| mean|
+------+----------+-------+-----+
|     A|2020-06-09|    100|100.0|
|     A|2020-07-03|    200|150.0|
|     A|2020-08-05|    600|600.0|
|     A|2020-08-30|   1000|800.0|
|     A|2020-09-15|    500|750.0|
|     B|2020-01-03|    100|100.0|
|     B|2020-04-05|    200|200.0|
|     B|2020-04-29|    600|400.0|
|     B|2020-05-01|   1600|800.0|
+------+----------+-------+-----+

如果您想使用pysparkAPI,您需要将天转换为unix秒以便使用rangeBetween

one_month_in_seconds = 2629743 # ?
w = (
Window.partitionBy("userid")
.orderBy(unix_timestamp(col("date").cast("timestamp")))
.rangeBetween(-one_month_in_seconds, Window.currentRow)
)
sdf_prueba.select(col("*"), mean("balance").over(w).alias("mean")).show()
+------+----------+-------+-----+
|userId|      date|balance| mean|
+------+----------+-------+-----+
|     A|2020-06-09|    100|100.0|
|     A|2020-07-03|    200|150.0|
|     A|2020-08-05|    600|600.0|
|     A|2020-08-30|   1000|800.0|
|     A|2020-09-15|    500|750.0|
|     B|2020-01-03|    100|100.0|
|     B|2020-04-05|    200|200.0|
|     B|2020-04-29|    600|400.0|
|     B|2020-05-01|   1600|800.0|
+------+----------+-------+-----+

最新更新