PySpark - 按 ID 和日期分组,按时间列以分钟为单位的总和



我在Spark处理我的数据,问题是类似的,可以像我在SQL中所做的那样修复:SUM(DATEDIFF(MINUTE, '0:00:00', targetcolumn)

但是,我想知道PySpark是否有这样做,特别是只有一个时间列?

我的数据框架是这样的:

df= df_temp.show()

|record_date| Tag| time|
+-----------+----+-----+
| 2012-05-05| A |13:14:07.000000|
| 2012-05-05| A |13:54:08.000000|
...................
| 2013-01-01| B |14:40:26.000000|
| 2013-01-01| B |14:48:27.000000|
..................
| 2014-04-03| C |17:17:30.000000|
| 2014-04-03| C |17:47:31.000000|

是可能的,我可以做像组record_date,标签然后用分钟来总结时间?所以它会变成这样:

|record_date| Tag| time|
+-----------+----+-----+
| 2012-05-05| A |00:41:01.000000|
| 2013-01-01| B |00:08:01.000000|
| 2014-04-03| C |00:30:01.000000|

时间列可以是任何格式,如:40分或0.4小时。

谢谢

如果只比较最近的两行,则Window "lead"函数可以在Scala上使用:

val df = Seq(
("2012-05-05", "A", "13:14:07.000000"),
("2012-05-05", "A", "13:54:08.000000"),
("2013-01-01", "B", "14:40:26.000000"),
("2013-01-01", "B", "14:48:27.000000"),
("2014-04-03", "C", "17:17:30.000000"),
("2014-04-03", "C", "17:47:31.000000")
).toDF("record_date", "Tag", "time")
val recordTagWindow = Window.partitionBy("record_date", "Tag").orderBy(desc("time"))
df
.withColumn("time", substring($"time", 1, 8))
.withColumn("unixTimestamp", unix_timestamp($"time", "HH:mm:ss"))
.withColumn("timeDiffSeconds", $"unixTimestamp" - lead($"unixTimestamp", 1, 0).over(recordTagWindow))
.withColumn("timeDiffFormatted", date_format($"timeDiffSeconds".cast(TimestampType).cast(TimestampType), "HH:mm:ss"))
.withColumn("rownum", row_number().over(recordTagWindow))
.where($"rownum" === 1)
.drop("rownum", "timeDiffSeconds", "time", "unixTimestamp")

输出(看起来像你的例子第一行不正确):

+-----------+---+-----------------+
|record_date|Tag|timeDiffFormatted|
+-----------+---+-----------------+
|2012-05-05 |A  |00:40:01         |
|2013-01-01 |B  |00:08:01         |
|2014-04-03 |C  |00:30:01         |
+-----------+---+-----------------+

对于多于两行,函数"first"one_answers";last"可以使用,并且窗口修改为包括所有值(rowsBetween):

val recordTagWindow = Window.partitionBy("record_date", "Tag").orderBy(desc("time"))
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df
.withColumn("time", substring($"time", 1, 8))
.withColumn("unixTimestamp", unix_timestamp($"time", "HH:mm:ss"))
.withColumn("timeDiffSeconds", first($"unixTimestamp").over(recordTagWindow) - last($"unixTimestamp").over(recordTagWindow))
.withColumn("timeDiffFormatted", date_format($"timeDiffSeconds".cast(TimestampType).cast(TimestampType), "HH:mm:ss"))
.withColumn("rownum", row_number().over(Window.partitionBy("record_date", "Tag").orderBy(desc("time"))))
.where($"rownum" === 1)
.drop("rownum", "timeDiffSeconds", "time", "unixTimestamp")
.show(false)

最新更新