SQL/Pyspark-添加基于动态时间戳的新列和另一列



我有这个数据:

id, name, timestamp
1, David, 2022/01/01 10:00
2, David, 2022/01/01 10:30
3, Diego, 2022/01/01 10:59
4, David, 2022/01/01 10:59
5, David, 2022/01/01 11:01
6, Diego, 2022/01/01 12:00
7, David, 2022/01/01 12:00
8, David, 2022/01/01 12:05
9, Diego, 2022/01/01 12:30

基本上大卫和迭戈在玩游戏。他们不时地在这些时间戳上按下一个按钮。

在他们第一次按下按钮后,游戏可以继续一个小时。之后,计数将重置,如果他们再次按下按钮,计数将在他们再次开始播放时计数。

所以我想标记为0(开始(,当他们在一个小时内第一次使用按钮时,如果他们在那个小时内,则标记为1(播放(。

所以在我的情况下,我会从结果中排除这个:

id, name, timestamp, status
1, David, 2022/01/01 10:00, 0  <--- David starts playing
2, David, 2022/01/01 10:30, 1  <--- David keeps playing the game that he started at the id 1
3, Diego, 2022/01/01 10:59, 0  <--- Diego starts playing
4, David, 2022/01/01 10:59, 1  <--- David keeps playing the game that he started at the id 1
5, David, 2022/01/01 11:01, 0  <--- David starts playing again
6, Diego, 2022/01/01 12:00, 0  <--- Diego starts playing again
7, David, 2022/01/01 12:00, 1  <--- David keeps playing the game that he started at the id 5
8, David, 2022/01/01 12:05, 0  <--- David start playing again
9, Diego, 2022/01/01 12:05, 1  <--- Diego keeps playing the game that he started at the id 6

我需要在pyspark中进行转换,以便标记我认为是start playingkeep playing的内容。

也许如果你能帮我做一个SQL查询,我最近可以把它适应pyspark。

它不需要只在一个查询/步骤中完成。

希望你能帮助我。

这不是一个完整的解决方案,但为了有我尝试过的任何想法,比如这个

from pyspark.sql.functions import explode
from datetime import datetime
from pyspark.sql.types import *
schema = StructType([StructField('id', StringType(), True),
StructField('name', StringType(), True),
StructField('timestamp', TimestampType(), True)])
df = spark.createDataFrame(
[
("1", "David", datetime.strptime("2022/01/01 10:00", '%Y/%m/%d %H:%M')),
("2", "David", datetime.strptime("2022/01/01 10:30",'%Y/%m/%d %H:%M')),
("3", "Diego", datetime.strptime("2022/01/01 10:59",'%Y/%m/%d %H:%M')),
("4", "David", datetime.strptime("2022/01/01 10:59", '%Y/%m/%d %H:%M')),
("5", "David", datetime.strptime("2022/01/01 11:01", '%Y/%m/%d %H:%M')),
("6", "Diego", datetime.strptime("2022/01/01 12:00", '%Y/%m/%d %H:%M')),
("7", "David", datetime.strptime("2022/01/01 12:00", '%Y/%m/%d %H:%M')),
("8", "David", datetime.strptime("2022/01/01 12:05", '%Y/%m/%d %H:%M')),
("9", "Diego", datetime.strptime("2022/01/01 12:30", '%Y/%m/%d %H:%M')),
],
schema=schema)
df.createOrReplaceTempView("people")
df3=spark.sql("select *,dense_rank()over(partition by hour(timestamp) order by name,timestamp )%2 as t4, case when dense_rank()over(partition by hour(timestamp) order by name,timestamp )%2>0 then dense_rank()over(partition by hour(timestamp) order by name,timestamp )%2-1 else  
dense_rank()over(partition by hour(timestamp) order by name,timestamp )%2+1 end t3 from people order by timestamp,name")
df3.show()

最新更新