在 pyspark 中根据时间间隔对数据进行分组



我正在尝试对数据进行分组和聚合。我根据日期和其他字段对其进行分组,因为它非常简单。现在我也在尝试根据时间间隔对其进行分组[Server_Time]

EventID AccessReason    Source  Server_Date Server_Time
847495004   Granted ORSB_GND_GYM_IN 10/1/2016   7:25:52 AM
847506432   Granted ORSB_GND_GYM_IN 10/1/2016   8:53:38 AM
847512725   Granted ORSB_GND_GYM_IN 10/1/2016   10:18:50 AM
847512768   Granted ORSB_GND_GYM_IN 10/1/2016   10:19:32 AM
847513357   Granted ORSB_GND_GYM_OUT 10/1/2016  10:25:36 AM
847513614   Granted ORSB_GND_GYM_IN 10/1/2016   10:28:08 AM
847515838   Granted ORSB_GND_GYM_OUT 10/1/2016  10:57:41 AM
847522522   Granted ORSB_GND_GYM_IN 10/1/2016   11:57:10 AM

例如。我需要聚合每小时的事件计数。从数据中我们可以看到,对于第 10 -11 小时,源"ORSB_GND_GYM_IN"的发生总数为 3,"ORSB_GND_GYM_OUT"的总计数为 2。我如何在 pyspark 中做到这一点

您可以使用

Udfs 将时间转换为范围,然后进行分组依据

from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
def getInterval(time):
    start = int(time.split(":")[0])
    return str(start)+"-"+str(start+1)+" "+time.split(" ")[1]
getIntervalUdf = udf(getInterval,StringType())
spark = SparkSession.builder.appName("appName").getOrCreate()
df = spark.read.csv("emp",sep=",",header=True)
df.show()
df = df.withColumn("Interval",getIntervalUdf("Server_Time"))
df.show()
df = df.groupby("Server_Date","Interval","Source").count()
df.show()

输出

+-----------+--------------+------------------+-------------+-------------+
|  EventID  | AccessReason |      Source      | Server_Date | Server_Time |
+-----------+--------------+------------------+-------------+-------------+
| 847495004 | Granted      | ORSB_GND_GYM_IN  | 10/1/2016   | 7:25:52 AM  |
| 847506432 | Granted      | ORSB_GND_GYM_IN  | 10/1/2016   | 8:53:38 AM  |
| 847512725 | Granted      | ORSB_GND_GYM_IN  | 10/1/2016   | 10:18:50 AM |
| 847512768 | Granted      | ORSB_GND_GYM_IN  | 10/1/2016   | 10:19:32 AM |
| 847513357 | Granted      | ORSB_GND_GYM_OUT | 10/1/2016   | 10:25:36 AM |
| 847513614 | Granted      | ORSB_GND_GYM_IN  | 10/1/2016   | 10:28:08 AM |
| 847515838 | Granted      | ORSB_GND_GYM_OUT | 10/1/2016   | 10:57:41 AM |
| 847522522 | Granted      | ORSB_GND_GYM_IN  | 10/1/2016   | 11:57:10 AM |
+-----------+--------------+------------------+-------------+-------------+
+---------+------------+----------------+-----------+-----------+--------+
|  EventID|AccessReason|          Source|Server_Date|Server_Time|Interval|
+---------+------------+----------------+-----------+-----------+--------+
|847495004|     Granted| ORSB_GND_GYM_IN|  10/1/2016| 7:25:52 AM|  7-8 AM|
|847506432|     Granted| ORSB_GND_GYM_IN|  10/1/2016| 8:53:38 AM|  8-9 AM|
|847512725|     Granted| ORSB_GND_GYM_IN|  10/1/2016|10:18:50 AM|10-11 AM|
|847512768|     Granted| ORSB_GND_GYM_IN|  10/1/2016|10:19:32 AM|10-11 AM|
|847513357|     Granted|ORSB_GND_GYM_OUT|  10/1/2016|10:25:36 AM|10-11 AM|
|847513614|     Granted| ORSB_GND_GYM_IN|  10/1/2016|10:28:08 AM|10-11 AM|
|847515838|     Granted|ORSB_GND_GYM_OUT|  10/1/2016|10:57:41 AM|10-11 AM|
|847522522|     Granted| ORSB_GND_GYM_IN|  10/1/2016|11:57:10 AM|11-12 AM|
+---------+------------+----------------+-----------+-----------+--------+
+-----------+--------+----------------+-----+
|Server_Date|Interval|          Source|count|
+-----------+--------+----------------+-----+
|  10/1/2016|10-11 AM| ORSB_GND_GYM_IN|    3|
|  10/1/2016|  8-9 AM| ORSB_GND_GYM_IN|    1|
|  10/1/2016|10-11 AM|ORSB_GND_GYM_OUT|    2|
|  10/1/2016|11-12 AM| ORSB_GND_GYM_IN|    1|
|  10/1/2016|  7-8 AM| ORSB_GND_GYM_IN|    1|
+-----------+--------+----------------+-----+

用于生成每日/每小时和 10 分钟间隔的计数

from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

def getHrInterval(time):
    start = int(time.split(":")[0])
    return str(start)+"-"+str(start+1)+" "+time.split(" ")[1]

def getMinInterval(time):
    hr_start = int(time.split(":")[0])
    min_start = int(str(int(time.split(":")[1])/10)+'0')
    return str(hr_start)+":"+str(min_start)+"-"+str(hr_start)+":"+str(min_start+10)+" "+time.split(" ")[1]
path = '/media/sf_VM_Shared/part-00000'
df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .load(path)
getHrIntervalUdf = udf(getHrInterval, StringType())
getMinIntervalUdf = udf(getMinInterval, StringType())
df = df.withColumn("HourInterval", getHrIntervalUdf("Server_Time")).withColumn("MinInterval", getMinIntervalUdf("Server_Time"))
df = df.groupby("Server_Date", "HourInterval", "MinInterval", "Source").count()
df.show()

相关内容

  • 没有找到相关文章

最新更新