火花迭代的功能库



我仍然是Spark的新手,我正在努力实现迭代功能。我希望有人可以帮助我吗?

特别是,我正在尝试实现 cusum 控制统计:

$ s_i = max(0,s_ {i -1} x_i- target -w $ with $ s_0 = 0 $ and $ w,target $是固定参数。

挑战在于,Cusum统计量定义为迭代函数,需要有序数据和先前的功能值。

以下数据框显示$ target = 1 $和$ w = 0.1 $:

的所需输出
i    x    S
--------------
1    1.3  0.2
2    1.8  0.9
3    0.5  0.3
4    0.6  0
5    1.2  0.1
6    1.8  0.8

在另一个注意事项上:我想不可能以分布式方式运行库司吗?我的数据集很大,但包含多个组。我希望这意味着我仍然可以实现一些并发。我想我必须重新分配我的数据以每组有一个单个分区才能同时运行Cusum算法?

我希望这是有道理的,任何指针都得到了高度赞赏!理想情况下,我正在寻找Scala和Spark 2.1

的解决方案

非常感谢!

在大量Google研究之后,我使用mapPartitions

找到了解决该问题的解决方案
val dataset = Seq(1.3, 1.8, 0.5, 0.6, 1.2, 1.8).toDS
dataset.repartition(1).mapPartitions(iterator => {
    var s = 0.0
    val target = 1.0
    val w = 0.1
    iterator.map(x => {
        s = Math.max(0.0, s + x -target - w)
        Math.round(10.0 *s)/10.0
    })
}).show()
+-----+
|value|
+-----+
|  0.2|
|  0.9|
|  0.3|
|  0.0|
|  0.1|
|  0.8|
+-----+

我希望这将在将来节省某个时间。

相关内容

  • 没有找到相关文章

最新更新