对于单例变量的使用是否使它们在Spark Dataframe处理框架中使用线程安全?微风fourierTr函数使用ThreadLocal,这似乎给我带来了问题。
我正在构建一个应用程序来组装多维表和计算不同维度上的FFT。
val r = df.rdd.flatMap{ row =>
// scrub the input, format data into coordinates with a value
// create a key corresponding to a slice through the data
// that will get processed in the next step
}
.groupByKey.flatMap{ case( sliceKey, coordinateList ) =>
// note the vector length is variable
val buf = new Array[Complex]( lengthOfVector )
// fill buffer with values from data structure slice
fourierTr( new DenseVector(buf) )
}
注意,这只是伪代码。为了给出一个简洁的例子,我删去了很多实际代码。
关键是对傅里叶变换的调用。当我在本地开发机器上运行这个程序时,一切正常,并且得到了预期的结果。然而,当我转移到更大的多核机器时,我得到了以下异常:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 10.0 failed 1 times, most recent failure: Lost task 2.0 in stage 10.0 (TID 36, localhost): java.lang.ArrayIndexOutOfBoundsException: 12
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.passfg(DoubleFFT_1D.java:3843)
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.cfftf(DoubleFFT_1D.java:3390)
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.complexForward(DoubleFFT_1D.java:189)
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.complexForward(DoubleFFT_1D.java:161)
at breeze.signal.fourierTr$$anon$5.apply(fourierTr.scala:69)
at breeze.signal.fourierTr$$anon$5.apply(fourierTr.scala:62)
at breeze.generic.UFunc$class.apply(UFunc.scala:48)
at breeze.signal.fourierTr$.apply(fourierTr.scala:25)
起初,我认为这可能是由于我的开发机器和集群之间的包版本差异(即。使用AWS)。在确保所有相关的jar版本都匹配之后,我仍然遇到同样的问题。然后我确定如果使用
启动应用程序,则应用程序运行良好。spark-submit --master local[1] ...
但是,如果我用
启动它spark-submit --master local[2] ...
或任何节点计数大于2,那么我将得到异常。这让我怀疑某些记忆被损坏了。所以我开始挖掘库的源代码。
入口点在傅里叶tr中。scala
implicit val dvComplex1DFFT : fourierTr.Impl[DenseVector[Complex], DenseVector[Complex]] = {
new fourierTr.Impl[DenseVector[Complex], DenseVector[Complex]] {
def apply(v: DenseVector[Complex]) = {
//reformat for input: note difference in format for input to real fft
val tempArr = denseVectorCToTemp(v)
//actual action
val fft_instance = getD1DInstance(v.length)
fft_instance.complexForward( tempArr ) //does operation in place
//reformat for output
tempToDenseVector(tempArr)
}
}
}
push in getD1DInstance in JTransformsSupport。
object JTransformsSupport {
//maintain instance of transform to eliminate repeated initialization
private val fft_instD1D = new ThreadLocal[(Int, DoubleFFT_1D)]
def getD1DInstance(length: Int): DoubleFFT_1D = {
if (fft_instD1D.get != null && length == fft_instD1D.get._1) fft_instD1D.get._2
else {
fft_instD1D.set((length, new DoubleFFT_1D(length)))
fft_instD1D.get()._2
}
}
注意,它正在修改一个共享变量fft_instD1D。我不是很熟悉ThreadLocal类型,但似乎这是为了使类线程安全。然而,我改变了我的代码实例化一个DoubleFFT_1D对象作为一个堆栈变量,然后我调用所有的低级例程直接(例如。而不是调用fourierTr,我调用了DoubleFFT_1D.complexForward)。
进行此更改后,无论Spark使用的节点数量如何,都不再发生异常。因此,傅里叶变换库对ThreadLocal的使用似乎是罪魁祸首。
我想知道其他认为自己是Scala/Breeze/Spark专家的人是否同意我的结论?
如果不正确,请建议如何在Spark Dataframe处理上下文中正确使用Breeze(特别是fourertr)。
如果这是正确的结论,那么我有一些后续问题…
- 是我错了,假设微风函数可以从内部调用数据帧处理管道?如果不打算从Dataframe处理管道中调用Breeze,那么是否有一种标准的方法来包装库,以便可以从管道中调用它,或者通常有必要像我所做的那样,重写部分库功能以消除共享变量?
- 如果Breeze打算从Dataframe管道调用,那么这看起来像微风库中的一个bug吗ThreadLocal类的实现?…ie。的关注我应该把这个拿去吗?
请向Breeze提交bug: github.com/scalanlp/breeze/issues
Breeze努力实现线程安全(这就是为什么我们使用ThreadLocal来为每个线程提供一个转换实例),但是这里出现了一些问题。(ThreadLocal代码对我来说看起来完全没问题,但后来我又写了。)