我仍然是Spark的新手,我正在努力实现迭代功能。我希望有人可以帮助我吗?
特别是,我正在尝试实现 cusum 控制统计:
$ s_i = max(0,s_ {i -1} x_i- target -w $ with $ s_0 = 0 $ and $ w,target $是固定参数。
挑战在于,Cusum统计量定义为迭代函数,需要有序数据和先前的功能值。
以下数据框显示$ target = 1 $和$ w = 0.1 $:
的所需输出i x S
--------------
1 1.3 0.2
2 1.8 0.9
3 0.5 0.3
4 0.6 0
5 1.2 0.1
6 1.8 0.8
在另一个注意事项上:我想不可能以分布式方式运行库司吗?我的数据集很大,但包含多个组。我希望这意味着我仍然可以实现一些并发。我想我必须重新分配我的数据以每组有一个单个分区才能同时运行Cusum算法?
我希望这是有道理的,任何指针都得到了高度赞赏!理想情况下,我正在寻找Scala和Spark 2.1
的解决方案非常感谢!
在大量Google研究之后,我使用mapPartitions
val dataset = Seq(1.3, 1.8, 0.5, 0.6, 1.2, 1.8).toDS
dataset.repartition(1).mapPartitions(iterator => {
var s = 0.0
val target = 1.0
val w = 0.1
iterator.map(x => {
s = Math.max(0.0, s + x -target - w)
Math.round(10.0 *s)/10.0
})
}).show()
+-----+
|value|
+-----+
| 0.2|
| 0.9|
| 0.3|
| 0.0|
| 0.1|
| 0.8|
+-----+
我希望这将在将来节省某个时间。