如何使用PySpark处理get低于持续时间



假设我们在PySpark中定义了一个DataFrame(df(。以及,如何使用PySpark获取同一天内第一次骑行和最后一次骑行之间的持续时间。并将结果保存到日期框架中,包括first_biking_timedeatails、last_biking_timedeatails和durations_newteween_first_last等。注意:在第一个和最后一个自行车动作之间可能还有其他动作。而且,如果一天内只有一次自行车运动,那么我们就不应该得到持续时间(因为我们无法进行计算,例如2018年3月3日(

以下是2018年1月3日的示例结果:

duration_03_01=13:12(上次骑行时间(-5:12(首次骑行时间(=8小时

下面的示例df:

>//tr>游泳自行车//tr>//tr>//tr>游泳自行车//tr>//tr>
timedeatils 操作
3/1/185:12 骑自行车
18年3月1日6:12游泳
18年3月1日8:12运行
18年3月1日10:12
3/1/18 11:12骑自行车
18年3月1日12:12运行
3/1/18 13:12骑自行车
18年3月2日4:12骑自行车
18年3月2日6:12运行
18年3月2日7:12
18年3月2日8:12
18年3月3日4:16
18年3月4日5:13运行
18年3月4日6:13骑自行车
18年3月4日7:13游泳
18年3月4日9:13
18年3月4日10:13
18年3月4日11:13骑自行车

您的df:

df  = spark.createDataFrame(
[
('3/1/2018 5:12','Biking')
,('3/1/2018 6:12','Running')
,('3/1/2018 7:12','Swimming')
,('3/1/2018 8:12','Running')
,('3/1/2018 9:12','Swimming')
,('3/1/2018 10:12','Biking')
,('3/1/2018 11:12','Biking')
,('3/1/2018 12:12','Running')
,('3/1/2018 13:12','Biking')
,('3/2/2018 4:12','Biking')
,('3/2/2018 5:12','Swimming')
,('3/2/2018 6:12','Running')
,('3/2/2018 7:12','Biking')
,('3/2/2018 8:12','Running')
,('3/3/2018 4:16','Biking')
,('3/4/2018 5:13','Biking')
,('3/4/2018 6:13','Running')
,('3/4/2018 7:13','Running')
,('3/4/2018 8:13','Swimming')
,('3/4/2018 9:13','Running')
,('3/4/2018 10:13','Running')
,('3/4/2018 11:13','Biking')
], ['TimeDetails','Actions']
)

使用窗口函数。您也可以将此解决方案应用于其他操作:

from pyspark.sql import functions as F
from pyspark.sql import Window
df = df.
withColumn('TimeDetails', F.to_timestamp('TimeDetails', 'M/d/y H:m'))
.withColumn('date', F.to_date('TimeDetails'))
w = Window.partitionBy('Actions', 'date').orderBy("date")
generic = df
.withColumn('first_record', F.first(F.col('TimeDetails'), ignorenulls=True).over(w))
.withColumn('last_record', F.last(F.col('TimeDetails'), ignorenulls=True).over(w))
.withColumn('Durations_in_Hours',(F.unix_timestamp("last_record") - F.unix_timestamp('first_record'))/3600)
.orderBy('TimeDetails')
biking = generic
.filter(F.col('Actions') == 'Biking')
.select(F.col('first_record').alias('First_Biking_time'),
F.col('Actions').alias('action_1'),
F.col('last_record').alias('Last_Biking_time'),
F.col('Actions').alias('action_2'),
F.col('Durations_in_Hours'))
.dropDuplicates()
.filter(F.col('Durations_in_Hours') != 0)
.orderBy('First_Biking_time')

biking.show()

输出:

+-------------------+--------+-------------------+--------+------------------+
|  First_Biking_time|action_1|   Last_Biking_time|action_2|Durations_in_Hours|
+-------------------+--------+-------------------+--------+------------------+
|2018-03-01 05:12:00|  Biking|2018-03-01 13:12:00|  Biking|               8.0|
|2018-03-02 04:12:00|  Biking|2018-03-02 07:12:00|  Biking|               3.0|
|2018-03-04 05:13:00|  Biking|2018-03-04 11:13:00|  Biking|               6.0|
+-------------------+--------+-------------------+--------+------------------+

最新更新