Spark Scala -基于时间对列表进行集群/分组



我有几年的大量时间序列数据,格式如下:

eventtime,value
---------------
2013-04-17 11:18:39.0,11.4
2013-04-17 11:19:40.0,82.0
2013-04-17 11:20:41.0,53.8
2013-04-17 17:22:00.0,31.0
2013-04-17 17:23:00.0,22.6
2013-04-17 17:24:00.0,43.1
2013-04-17 21:48:00.0,11.0
2013-04-17 21:49:00.0,22.1
2013-04-17 21:50:00.0,3.2
2013-04-17 21:51:00.0,13.1

从上面的数据中,我想按时间的簇进行分组,然后对每个组执行聚合函数(max, mean)。请注意,在上面的样本数据中有三个这样的集群。期望输出值:

Group, Sum
-------------
[2013-04-17 11:18:39.0,2013-04-17 11:19:40.0,2013-04-17 11:20:41.0],147.2
[2013-04-17 17:22:00.0,2013-04-17 17:23:00.0,2013-04-17 17:24:00.0],96.7
[2013-04-17 21:48:00.0,11.0,2013-04-17 21:49:00.0,22.1,2013-04-17 21:50:00.0,2013-04-17 21:51:00.0],49.4

这些集群可以在一天中的任何时间发生,集群中的事件数量也不是固定的。区分这些集群的一种方法是通过集群的时差,比如间隔一小时的集群。

请告诉我如何在Spark Scala中实现这一点。

谢谢

可以使用下面的代码实现上述功能:

import java.text.DecimalFormat
var doubleFormat = new DecimalFormat("#.00")
val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd HH")
var groupedRdd = rdd.map(value => value.split(",")).map(arr => (dateFormat.format(dateFormat.parse(arr(0))), (Array(arr(0)), arr(1).toDouble))).cache
// To calculate the sum
var sumRdd = groupedRdd.reduceByKey((obj1 ,obj2 ) => ((obj1._1 ++ obj2._1), (obj1._2 + obj2._2)))
sumRdd.foreach(value => println(value._1 + ":[" + value._2._1.mkString(",") + "]:" + doubleFormat.format(value._2._2)))
// Output of Sum func 
2013-04-17 21:[2013-04-17 21:48:00.0,2013-04-17 21:49:00.0,2013-04-17 21:50:00.0,2013-04-17 21:51:00.0]:49.40
2013-04-17 17:[2013-04-17 17:22:00.0,2013-04-17 17:23:00.0,2013-04-17 17:24:00.0]:96.70
2013-04-17 11:[2013-04-17 11:18:39.0,2013-04-17 11:19:40.0,2013-04-17 11:20:41.0]:147.20

// To calculate Max value
var maxRdd = groupedRdd.reduceByKey((obj1 ,obj2 ) => ((obj1._1 ++ obj2._1), Math.max(obj1._2 , obj2._2)))
maxRdd.foreach(value => println(value._1 + ":[" + value._2._1.mkString(",") + "]:" + doubleFormat.format(value._2._2)))
// Output of Max func
2013-04-17 21:[2013-04-17 21:48:00.0,2013-04-17 21:49:00.0,2013-04-17 21:50:00.0,2013-04-17 21:51:00.0]:22.10
2013-04-17 17:[2013-04-17 17:22:00.0,2013-04-17 17:23:00.0,2013-04-17 17:24:00.0]:43.10
2013-04-17 11:[2013-04-17 11:18:39.0,2013-04-17 11:19:40.0,2013-04-17 11:20:41.0]:82.00

// To calculate the min value
var minRdd = groupedRdd.reduceByKey((obj1 ,obj2 ) => ((obj1._1 ++ obj2._1), Math.min(obj1._2 , obj2._2)))
minRdd.foreach(value => println(value._1 + ":[" + value._2._1.mkString(",") + "]:" + doubleFormat.format(value._2._2)))

// Output of the min value
2013-04-17 21:[2013-04-17 21:48:00.0,2013-04-17 21:49:00.0,2013-04-17 21:50:00.0,2013-04-17 21:51:00.0]:3.20
2013-04-17 17:[2013-04-17 17:22:00.0,2013-04-17 17:23:00.0,2013-04-17 17:24:00.0]:22.60
2013-04-17 11:[2013-04-17 11:18:39.0,2013-04-17 11:19:40.0,2013-04-17 11:20:41.0]:11.40

输出格式为: cluster:[eventimes]:result

希望以上解决方案对您计算聚合结果有帮助。

相关内容

  • 没有找到相关文章

最新更新