apache spark sql - Pyspark用户定义的列聚合计算



我正在为Pyspark中的分类器准备输入数据。我一直在使用SparkSQL中的聚合函数来提取平均值和方差等特征。它们按活动、名称和窗口分组。窗口的计算方法是将unix时间戳除以10000,以划分为10秒的时间窗口。

sample = sqlContext.sql("SELECT activity, name, window, avg(acc_x) as avgX , variance(acc_x) as varX FROM data  GROUP BY activity,name,window ORDER BY activity,name,window")

结果看起来像

Activity  Name         Window       AvgX       VarX
Walk    accelerometer  95875        2.0          1.0

我现在要做的是计算x上每个点的平均斜率

为此,我需要时间戳,窗口和x。我已经在Python中实现了逻辑,使用数组,这就是它看起来的样子-计算每个点之间的斜率,然后得到平均斜率。理想情况下,我希望在UDAF中做到这一点,这在Pyspark中尚未得到支持。它看起来是这样的,假设下面的函数叫做斜率。然后在sql中输入slope(timestamp, X) as avgSlopeX

编辑-更改输入,使其更清晰。我要做的就是计算每个点之间的斜率,然后返回窗口内斜率的平均值。因此,当我得到每个窗口的平均值和方差时,我还想得到平均斜率。

#sample input
timestamp = [1464703425544,1464703426534,1464703427551,1464703428587,1464703429512,1464703430493,1464703431505,1464703432543,1464703433513,1464703434529]
values = [1021.31,1021.26,1021.19,1021.19,1021.1,1021.1,1021.1, 1021.05,1021.02]
i = 0; 
slope = 0.0;
totalSlope = 0.0;
while (i < len(timestamp) - 1):
    y2 = values[i+1];
    y1 = values[i];
    x2 = timestamp[i + 1];
    x1 = timestamp[i]; 
    slope = ((y2-y1)/(x2-x1)); 
    totalSlope = totalSlope + slope;
    i=i+1
avgSlope = (totalSlope/len(x_values))

我如何实现这个?我应该尝试转换到熊猫数据框然后numpy数组吗?如果是这样,我如何确保数据仍然被正确映射,记住sql查询中的GROUP BY活动,名称窗口。

一般来说,这不是UDAF的工作,因为UDAF不提供任何定义顺序的方法。看起来你真正需要的是一些窗口函数和标准聚合的组合。

from pyspark.sql.functions import col, lag, avg
from pyspark.sql.window import Window
df = ... 
## DataFrame[activity: string, name: string, window: bigint, 
##   timestamp: bigint, value: float]
group = ["activity", "name", "window"]
w = (Window()
    .partitionBy(*group)
    .orderBy("timestamp"))
v_diff = col("value") - lag("value", 1).over(w)
t_diff = col("timestamp") - lag("timestamp", 1).over(w)
slope = v_diff / t_diff
df.withColumn("slope", slope).groupBy(*group).agg(avg(col("slope")))

相关内容

  • 没有找到相关文章

最新更新