我有这个数据:
id, name, timestamp
1, David, 2022/01/01 10:00
2, David, 2022/01/01 10:30
3, Diego, 2022/01/01 10:59
4, David, 2022/01/01 10:59
5, David, 2022/01/01 11:01
6, Diego, 2022/01/01 12:00
7, David, 2022/01/01 12:00
8, David, 2022/01/01 12:05
9, Diego, 2022/01/01 12:30
基本上大卫和迭戈在玩游戏。他们不时地在这些时间戳上按下一个按钮。
在他们第一次按下按钮后,游戏可以继续一个小时。之后,计数将重置,如果他们再次按下按钮,计数将在他们再次开始播放时计数。
所以我想标记为0
(开始(,当他们在一个小时内第一次使用按钮时,如果他们在那个小时内,则标记为1
(播放(。
所以在我的情况下,我会从结果中排除这个:
id, name, timestamp, status
1, David, 2022/01/01 10:00, 0 <--- David starts playing
2, David, 2022/01/01 10:30, 1 <--- David keeps playing the game that he started at the id 1
3, Diego, 2022/01/01 10:59, 0 <--- Diego starts playing
4, David, 2022/01/01 10:59, 1 <--- David keeps playing the game that he started at the id 1
5, David, 2022/01/01 11:01, 0 <--- David starts playing again
6, Diego, 2022/01/01 12:00, 0 <--- Diego starts playing again
7, David, 2022/01/01 12:00, 1 <--- David keeps playing the game that he started at the id 5
8, David, 2022/01/01 12:05, 0 <--- David start playing again
9, Diego, 2022/01/01 12:05, 1 <--- Diego keeps playing the game that he started at the id 6
我需要在pyspark中进行转换,以便标记我认为是start playing
和keep playing
的内容。
也许如果你能帮我做一个SQL查询,我最近可以把它适应pyspark。
它不需要只在一个查询/步骤中完成。
希望你能帮助我。
这不是一个完整的解决方案,但为了有我尝试过的任何想法,比如这个
from pyspark.sql.functions import explode
from datetime import datetime
from pyspark.sql.types import *
schema = StructType([StructField('id', StringType(), True),
StructField('name', StringType(), True),
StructField('timestamp', TimestampType(), True)])
df = spark.createDataFrame(
[
("1", "David", datetime.strptime("2022/01/01 10:00", '%Y/%m/%d %H:%M')),
("2", "David", datetime.strptime("2022/01/01 10:30",'%Y/%m/%d %H:%M')),
("3", "Diego", datetime.strptime("2022/01/01 10:59",'%Y/%m/%d %H:%M')),
("4", "David", datetime.strptime("2022/01/01 10:59", '%Y/%m/%d %H:%M')),
("5", "David", datetime.strptime("2022/01/01 11:01", '%Y/%m/%d %H:%M')),
("6", "Diego", datetime.strptime("2022/01/01 12:00", '%Y/%m/%d %H:%M')),
("7", "David", datetime.strptime("2022/01/01 12:00", '%Y/%m/%d %H:%M')),
("8", "David", datetime.strptime("2022/01/01 12:05", '%Y/%m/%d %H:%M')),
("9", "Diego", datetime.strptime("2022/01/01 12:30", '%Y/%m/%d %H:%M')),
],
schema=schema)
df.createOrReplaceTempView("people")
df3=spark.sql("select *,dense_rank()over(partition by hour(timestamp) order by name,timestamp )%2 as t4, case when dense_rank()over(partition by hour(timestamp) order by name,timestamp )%2>0 then dense_rank()over(partition by hour(timestamp) order by name,timestamp )%2-1 else
dense_rank()over(partition by hour(timestamp) order by name,timestamp )%2+1 end t3 from people order by timestamp,name")
df3.show()