使用Windows对5分钟时间段进行分组



csv文件为:

#+----+-----------+-------------------+
#|col1|       col2|          timestamp|
#+----+-----------+-------------------+
#|   0|Town Street|01-02-2017 06:01:00|
#|   0|Town Street|01-02-2017 06:03:00|
#|   0|Town Street|01-02-2017 06:05:00|
#|   0|Town Street|01-02-2017 06:06:00|
#|   0|Town Street|02-02-2017 10:01:00|
#|   0|Town Street|02-02-2017 10:05:00|
#+----+-----------+-------------------+

比较每个日期的时间,看看是否有5分钟的差异,如果他们是

输出:

#+----+-----------+-------------------+
#|col1|       col2|          timestamp|
#+----+-----------+-------------------+
#|   0|Town Street|01-02-2017 06:01:00|
#|   0|Town Street|01-02-2017 06:03:00|
#|   0|Town Street|01-02-2017 06:05:00|
#|   0|Town Street|01-02-2017 06:06:00|
#|   0|Town Street|02-02-2017 10:01:00|
#|   0|Town Street|02-02-2017 10:05:00|
#+----+-----------+-------------------+

现在的代码:

from pyspark.sql import SQLContext
import pyspark.sql.functions as F
def my_main(sc, my_dataset_dir):
sqlContext = SQLContext(sc)
df = sqlContext.read.csv(my_dataset_dir,sep=';').rdd.zipWithIndex().filter(lambda x: x[1] > 1).map(lambda x: x[0]).toDF(['status','title','datetime'])

此代码仅在5分钟窗口内给出一个空结果。

不确定这是否正是你想要的,但它应该会把你推向正确的方向。您可以将时间戳转换为timestamptypedatetype。要在seconds(300)中创建windowpartitionBy的日期和rangebetween时间戳。

#df.show()
#sampledataframe
#+----+-----------+-------------------+
#|col1|       col2|          timestamp|
#+----+-----------+-------------------+
#|   0|Town Street|01-02-2017 06:01:00|
#|   0|Town Street|01-02-2017 06:03:00|
#|   0|Town Street|01-02-2017 06:05:00|
#|   0|Town Street|01-02-2017 06:06:00|
#|   0|Town Street|02-02-2017 10:01:00|
#|   0|Town Street|02-02-2017 10:05:00|
#+----+-----------+-------------------+
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("date").orderBy(F.col("timestamp").cast("long")).rangeBetween(Window.currentRow,60*5)
df.withColumn("timestamp", F.to_timestamp("timestamp",'MM-dd-yyyy HH:mm:ss'))
.withColumn("date", F.to_date("timestamp"))
.withColumn('collect', F.size(F.collect_list("timestamp").over(w))).filter("collect>1")
.select(F.date_format("date","yyyy-MM-dd").alias("date"), F.array(F.date_format("timestamp","HH:mm:ss"),F.col("collect")).alias("time"))
.orderBy("date").show()
#+----------+-------------+
#|      date|         time|
#+----------+-------------+
#|2017-01-02|[06:01:00, 4]|
#|2017-01-02|[06:05:00, 2]|
#|2017-01-02|[06:03:00, 3]|
#|2017-02-02|[10:01:00, 2]|
#+----------+-------------+

最新更新