我正在尝试使用Scala的并行集合来并行调度一些计算。因为有很多输入数据,所以我使用可变数组来存储数据,以避免GC问题。这是我最初采用的方法:
// initialize the reusable input data structure
val inputData = new Array[Array[Int]](Runtime.getRuntime.availableProcessors*ChunkSize)
for (i <- 0 until inputData.length) {
inputData(i) = new Array[Int](arraySize)
}
// process the input
while (haveMoreInput()) {
// read the input--must be sequential!
for (array <- 0 until inputData.length) {
for (index <- 0 until arraySize) {
array(index) = deserializeFromExternalSource()
}
}
// map the data in parallel
// note that the input data is NOT modified by longRuningProcess
val results = for (array <- inputData.par) yield {
longRunningProcess(array)
}
// use the results--must be sequential and ordered as input
for (result <- results.toArray) {
useResult(result)
}
}
假设ParallelArray
的底层数组可以被安全地重用(即,修改并用作另一个ParallelArray
的底层结构),上述剪切应该如预期的那样工作。但是,当运行时,它会崩溃并出现内存错误:
*** Error in `*** Error in `java': double free or corruption (fasttop): <memory address> ***
这表面上与并行集合直接使用创建它的数组有关;也许它试图在超出作用域时释放这个数组。在任何情况下,由于内存限制,不能为每个循环创建一个新数组。在while
循环的内部和外部显式地创建var parInputData = inputData.par
会导致相同的双自由错误。
我不能简单地使inputData
本身成为并行集合,因为它需要按顺序填充(在尝试对并行版本进行赋值后,我意识到赋值没有按顺序执行)。使用Vector
作为外部数据结构似乎适用于相对较小的输入大小(<</p>
在大量输入时导致GC开销异常。
我最终采取的方法涉及制作Vector[Vector[Array[Int]]]
,外部向量的长度等于正在使用的并行线程的数量。然后,我用输入数据数组块手动填充每个子Vector
,然后在外部向量上进行并行映射。
最后一种方法可以工作,但是手动将输入分成块并将这些块添加到另一层的并行集合中是乏味的。是否有一种方法允许Scala重用可变数组进行并行操作?
编辑:将上面的并行向量解决方案与使用同步队列的手动并行化解决方案进行基准测试显示,并行向量要慢50%左右。我想知道这是否只是一个更好的抽象的开销,或者如果这个差距可以通过使用并行数组而不是Vector
s来减少;这将带来与Vector
相比使用数组的另一个好处。
将数据分割成块实际上没有意义,Parallel Collections库的主要意义在于它为您完成了这一工作,并且比使用固定的块大小做得更好。此外,JVM上的数组的数组不像C中的数组的数组,它们更像是指向许多小数组的指针的数组,这使得它们效率低下。
一个更优雅的解决方法是使用普通的Array
,并使用ParRange
对其进行操作。longRunningProcess
必须更改为一次操作单个元素:
val arraySize = ???
val inputData = Array[Int](arraySize)
val outputData = Array[ResultType](arraySize)
while(haveMoreInput()) {
for (i <- 0 until arraySize)
inputData(i) = deserializeFromExternalSource()
for (i <- (0 until arraySize).par)
outputData(i) = longRunningProcess(inputData(i))
outputData.foreach(useResult)
}
只使用两个大数组,并且从不分配任何新数组。ParArray.map
, ParArray.toArray
, Array.par
在原代码中分配了新的数组
我们仍然需要使用固定的arraySize
来确保我们不会将更多的数据加载到我们有空间的内存中。一个更好的解决方案是使用响应式流,但它们还没有准备好用于生产。