我在Spark处理我的数据,问题是类似的,可以像我在SQL中所做的那样修复:SUM(DATEDIFF(MINUTE, '0:00:00', targetcolumn)
但是,我想知道PySpark是否有这样做,特别是只有一个时间列?
我的数据框架是这样的:
df= df_temp.show()
|record_date| Tag| time|
+-----------+----+-----+
| 2012-05-05| A |13:14:07.000000|
| 2012-05-05| A |13:54:08.000000|
...................
| 2013-01-01| B |14:40:26.000000|
| 2013-01-01| B |14:48:27.000000|
..................
| 2014-04-03| C |17:17:30.000000|
| 2014-04-03| C |17:47:31.000000|
是可能的,我可以做像组record_date,标签然后用分钟来总结时间?所以它会变成这样:
|record_date| Tag| time|
+-----------+----+-----+
| 2012-05-05| A |00:41:01.000000|
| 2013-01-01| B |00:08:01.000000|
| 2014-04-03| C |00:30:01.000000|
时间列可以是任何格式,如:40分或0.4小时。
谢谢
如果只比较最近的两行,则Window "lead"函数可以在Scala上使用:
val df = Seq(
("2012-05-05", "A", "13:14:07.000000"),
("2012-05-05", "A", "13:54:08.000000"),
("2013-01-01", "B", "14:40:26.000000"),
("2013-01-01", "B", "14:48:27.000000"),
("2014-04-03", "C", "17:17:30.000000"),
("2014-04-03", "C", "17:47:31.000000")
).toDF("record_date", "Tag", "time")
val recordTagWindow = Window.partitionBy("record_date", "Tag").orderBy(desc("time"))
df
.withColumn("time", substring($"time", 1, 8))
.withColumn("unixTimestamp", unix_timestamp($"time", "HH:mm:ss"))
.withColumn("timeDiffSeconds", $"unixTimestamp" - lead($"unixTimestamp", 1, 0).over(recordTagWindow))
.withColumn("timeDiffFormatted", date_format($"timeDiffSeconds".cast(TimestampType).cast(TimestampType), "HH:mm:ss"))
.withColumn("rownum", row_number().over(recordTagWindow))
.where($"rownum" === 1)
.drop("rownum", "timeDiffSeconds", "time", "unixTimestamp")
输出(看起来像你的例子第一行不正确):
+-----------+---+-----------------+
|record_date|Tag|timeDiffFormatted|
+-----------+---+-----------------+
|2012-05-05 |A |00:40:01 |
|2013-01-01 |B |00:08:01 |
|2014-04-03 |C |00:30:01 |
+-----------+---+-----------------+
对于多于两行,函数"first"one_answers";last"可以使用,并且窗口修改为包括所有值(rowsBetween):
val recordTagWindow = Window.partitionBy("record_date", "Tag").orderBy(desc("time"))
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df
.withColumn("time", substring($"time", 1, 8))
.withColumn("unixTimestamp", unix_timestamp($"time", "HH:mm:ss"))
.withColumn("timeDiffSeconds", first($"unixTimestamp").over(recordTagWindow) - last($"unixTimestamp").over(recordTagWindow))
.withColumn("timeDiffFormatted", date_format($"timeDiffSeconds".cast(TimestampType).cast(TimestampType), "HH:mm:ss"))
.withColumn("rownum", row_number().over(Window.partitionBy("record_date", "Tag").orderBy(desc("time"))))
.where($"rownum" === 1)
.drop("rownum", "timeDiffSeconds", "time", "unixTimestamp")
.show(false)