我正在尝试对数据进行分组和聚合。我根据日期和其他字段对其进行分组,因为它非常简单。现在我也在尝试根据时间间隔对其进行分组[Server_Time]
EventID AccessReason Source Server_Date Server_Time
847495004 Granted ORSB_GND_GYM_IN 10/1/2016 7:25:52 AM
847506432 Granted ORSB_GND_GYM_IN 10/1/2016 8:53:38 AM
847512725 Granted ORSB_GND_GYM_IN 10/1/2016 10:18:50 AM
847512768 Granted ORSB_GND_GYM_IN 10/1/2016 10:19:32 AM
847513357 Granted ORSB_GND_GYM_OUT 10/1/2016 10:25:36 AM
847513614 Granted ORSB_GND_GYM_IN 10/1/2016 10:28:08 AM
847515838 Granted ORSB_GND_GYM_OUT 10/1/2016 10:57:41 AM
847522522 Granted ORSB_GND_GYM_IN 10/1/2016 11:57:10 AM
例如。我需要聚合每小时的事件计数。从数据中我们可以看到,对于第 10 -11 小时,源"ORSB_GND_GYM_IN"的发生总数为 3,"ORSB_GND_GYM_OUT"的总计数为 2。我如何在 pyspark 中做到这一点
您可以使用
Udfs 将时间转换为范围,然后进行分组依据
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
def getInterval(time):
start = int(time.split(":")[0])
return str(start)+"-"+str(start+1)+" "+time.split(" ")[1]
getIntervalUdf = udf(getInterval,StringType())
spark = SparkSession.builder.appName("appName").getOrCreate()
df = spark.read.csv("emp",sep=",",header=True)
df.show()
df = df.withColumn("Interval",getIntervalUdf("Server_Time"))
df.show()
df = df.groupby("Server_Date","Interval","Source").count()
df.show()
输出
+-----------+--------------+------------------+-------------+-------------+
| EventID | AccessReason | Source | Server_Date | Server_Time |
+-----------+--------------+------------------+-------------+-------------+
| 847495004 | Granted | ORSB_GND_GYM_IN | 10/1/2016 | 7:25:52 AM |
| 847506432 | Granted | ORSB_GND_GYM_IN | 10/1/2016 | 8:53:38 AM |
| 847512725 | Granted | ORSB_GND_GYM_IN | 10/1/2016 | 10:18:50 AM |
| 847512768 | Granted | ORSB_GND_GYM_IN | 10/1/2016 | 10:19:32 AM |
| 847513357 | Granted | ORSB_GND_GYM_OUT | 10/1/2016 | 10:25:36 AM |
| 847513614 | Granted | ORSB_GND_GYM_IN | 10/1/2016 | 10:28:08 AM |
| 847515838 | Granted | ORSB_GND_GYM_OUT | 10/1/2016 | 10:57:41 AM |
| 847522522 | Granted | ORSB_GND_GYM_IN | 10/1/2016 | 11:57:10 AM |
+-----------+--------------+------------------+-------------+-------------+
+---------+------------+----------------+-----------+-----------+--------+
| EventID|AccessReason| Source|Server_Date|Server_Time|Interval|
+---------+------------+----------------+-----------+-----------+--------+
|847495004| Granted| ORSB_GND_GYM_IN| 10/1/2016| 7:25:52 AM| 7-8 AM|
|847506432| Granted| ORSB_GND_GYM_IN| 10/1/2016| 8:53:38 AM| 8-9 AM|
|847512725| Granted| ORSB_GND_GYM_IN| 10/1/2016|10:18:50 AM|10-11 AM|
|847512768| Granted| ORSB_GND_GYM_IN| 10/1/2016|10:19:32 AM|10-11 AM|
|847513357| Granted|ORSB_GND_GYM_OUT| 10/1/2016|10:25:36 AM|10-11 AM|
|847513614| Granted| ORSB_GND_GYM_IN| 10/1/2016|10:28:08 AM|10-11 AM|
|847515838| Granted|ORSB_GND_GYM_OUT| 10/1/2016|10:57:41 AM|10-11 AM|
|847522522| Granted| ORSB_GND_GYM_IN| 10/1/2016|11:57:10 AM|11-12 AM|
+---------+------------+----------------+-----------+-----------+--------+
+-----------+--------+----------------+-----+
|Server_Date|Interval| Source|count|
+-----------+--------+----------------+-----+
| 10/1/2016|10-11 AM| ORSB_GND_GYM_IN| 3|
| 10/1/2016| 8-9 AM| ORSB_GND_GYM_IN| 1|
| 10/1/2016|10-11 AM|ORSB_GND_GYM_OUT| 2|
| 10/1/2016|11-12 AM| ORSB_GND_GYM_IN| 1|
| 10/1/2016| 7-8 AM| ORSB_GND_GYM_IN| 1|
+-----------+--------+----------------+-----+
用于生成每日/每小时和 10 分钟间隔的计数
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
def getHrInterval(time):
start = int(time.split(":")[0])
return str(start)+"-"+str(start+1)+" "+time.split(" ")[1]
def getMinInterval(time):
hr_start = int(time.split(":")[0])
min_start = int(str(int(time.split(":")[1])/10)+'0')
return str(hr_start)+":"+str(min_start)+"-"+str(hr_start)+":"+str(min_start+10)+" "+time.split(" ")[1]
path = '/media/sf_VM_Shared/part-00000'
df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load(path)
getHrIntervalUdf = udf(getHrInterval, StringType())
getMinIntervalUdf = udf(getMinInterval, StringType())
df = df.withColumn("HourInterval", getHrIntervalUdf("Server_Time")).withColumn("MinInterval", getMinIntervalUdf("Server_Time"))
df = df.groupby("Server_Date", "HourInterval", "MinInterval", "Source").count()
df.show()