我正在尝试在事件数据上计算 pyspark 中的日期差异和count_diff。
数据如下所示
deviceid techid name count load_date
m1 1 a 30 23-01-2016
m2 1 b 40 23-01-2016
m1 1 a 45 29-01-2016
m1 2 a 50 30-01-2016
我希望它看起来像这样
deviceid techid name count load_date datediff countdiff
m1 1 a 30 23-01-2016 NA NA
m2 1 b 40 23-01-2016 NA NA
m1 1 a 45 29-01-2016 6 15
m1 2 a 50 30-01-2016 NA NA
如何在 pyspark 中使用这些值创建列,对事件条件的更改进行日期差异。
这可以使用窗口函数来解决。
(1) 使用示例测试数据创建数据框
df = spark.createDataFrame([('m1',1,'a',30,'23-01-2016'),('m2',1,'b',40,'23-01-2016'),('m1',1,'a',45,'29-01-2016'),('m1',2,'a',50,'30-01-2016')], ['deviceid','techid','name','count','load_date'])
df1 = df.selectExpr("deviceid","techid","name","count","to_timestamp(load_date, 'dd-MM-yyyy') AS load_date")
(2) 定义窗口并使用lag
函数构建以前的计数和以前的加载日期列逻辑
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number,lag
windowSpec = Window.partitionBy('deviceid','techid').orderBy('load_date')
prev_count = lag('count').over(windowSpec).alias('rank')
prev_load_date = lag('load_date').over(windowSpec).alias('rank')
df2 = df1.withColumn("prev_count", prev_count)
.withColumn("prev_load_date", prev_load_date)
(3)将原始列与以前的列减去以计算差额。
df2.selectExpr("deviceid",
"techid",
"name",
"count",
"load_date",
"datediff(load_date,prev_load_date) AS datediff",
"(count - prev_count) AS countdiff")
.show()
#+--------+------+----+-----+-------------------+--------+---------+
#|deviceid|techid|name|count| load_date|datediff|countdiff|
#+--------+------+----+-----+-------------------+--------+---------+
#| m1| 1| a| 30|2016-01-23 00:00:00| null| null|
#| m1| 1| a| 45|2016-01-29 00:00:00| 6| 15|
#| m1| 2| a| 50|2016-01-30 00:00:00| null| null|
#| m2| 1| b| 40|2016-01-23 00:00:00| null| null|
#+--------+------+----+-----+-------------------+--------+---------+