Scala Breeze包在Spark Dataframe处理中线程安全吗?



对于单例变量的使用是否使它们在Spark Dataframe处理框架中使用线程安全?微风fourierTr函数使用ThreadLocal,这似乎给我带来了问题。

我正在构建一个应用程序来组装多维表和计算不同维度上的FFT。

val r = df.rdd.flatMap{ row =>
      // scrub the input, format data into coordinates with a value
      // create a key corresponding to a slice through the data
      // that will get processed in the next step      
    }
    .groupByKey.flatMap{ case( sliceKey, coordinateList ) => 
      // note the vector length is variable
      val buf = new Array[Complex]( lengthOfVector )
      // fill buffer with values from data structure slice
      fourierTr( new DenseVector(buf) )
    }

注意,这只是伪代码。为了给出一个简洁的例子,我删去了很多实际代码。

关键是对傅里叶变换的调用。当我在本地开发机器上运行这个程序时,一切正常,并且得到了预期的结果。然而,当我转移到更大的多核机器时,我得到了以下异常:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 10.0 failed 1 times, most recent failure: Lost task 2.0 in stage 10.0 (TID 36, localhost): java.lang.ArrayIndexOutOfBoundsException: 12
    at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.passfg(DoubleFFT_1D.java:3843)
    at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.cfftf(DoubleFFT_1D.java:3390)
    at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.complexForward(DoubleFFT_1D.java:189)
    at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.complexForward(DoubleFFT_1D.java:161)
    at breeze.signal.fourierTr$$anon$5.apply(fourierTr.scala:69)
    at breeze.signal.fourierTr$$anon$5.apply(fourierTr.scala:62)
    at breeze.generic.UFunc$class.apply(UFunc.scala:48)
    at breeze.signal.fourierTr$.apply(fourierTr.scala:25)

起初,我认为这可能是由于我的开发机器和集群之间的包版本差异(即。使用AWS)。在确保所有相关的jar版本都匹配之后,我仍然遇到同样的问题。然后我确定如果使用

启动应用程序,则应用程序运行良好。
spark-submit --master local[1] ...
但是,如果我用 启动它
spark-submit --master local[2] ...

或任何节点计数大于2,那么我将得到异常。这让我怀疑某些记忆被损坏了。所以我开始挖掘库的源代码。

入口点在傅里叶tr中。scala

  implicit val dvComplex1DFFT : fourierTr.Impl[DenseVector[Complex], DenseVector[Complex]] = {
    new fourierTr.Impl[DenseVector[Complex], DenseVector[Complex]] {
      def apply(v: DenseVector[Complex]) = {
        //reformat for input: note difference in format for input to real fft
        val tempArr = denseVectorCToTemp(v)
        //actual action
        val fft_instance = getD1DInstance(v.length)
        fft_instance.complexForward( tempArr ) //does operation in place
        //reformat for output
        tempToDenseVector(tempArr)
      }
    }
  }

push in getD1DInstance in JTransformsSupport。

object JTransformsSupport {
  //maintain instance of transform to eliminate repeated initialization
  private val fft_instD1D = new ThreadLocal[(Int, DoubleFFT_1D)]
  def getD1DInstance(length: Int): DoubleFFT_1D = {
    if (fft_instD1D.get != null && length == fft_instD1D.get._1) fft_instD1D.get._2
    else {
      fft_instD1D.set((length, new DoubleFFT_1D(length)))
      fft_instD1D.get()._2
    }
  }

注意,它正在修改一个共享变量fft_instD1D。我不是很熟悉ThreadLocal类型,但似乎这是为了使类线程安全。然而,我改变了我的代码实例化一个DoubleFFT_1D对象作为一个堆栈变量,然后我调用所有的低级例程直接(例如。而不是调用fourierTr,我调用了DoubleFFT_1D.complexForward)。

进行此更改后,无论Spark使用的节点数量如何,都不再发生异常。因此,傅里叶变换库对ThreadLocal的使用似乎是罪魁祸首。

我想知道其他认为自己是Scala/Breeze/Spark专家的人是否同意我的结论?

如果不正确,请建议如何在Spark Dataframe处理上下文中正确使用Breeze(特别是fourertr)。

如果这是正确的结论,那么我有一些后续问题…

  1. 是我错了,假设微风函数可以从内部调用数据帧处理管道?如果不打算从Dataframe处理管道中调用Breeze,那么是否有一种标准的方法来包装库,以便可以从管道中调用它,或者通常有必要像我所做的那样,重写部分库功能以消除共享变量?
  2. 如果Breeze打算从Dataframe管道调用,那么这看起来像微风库中的一个bug吗ThreadLocal类的实现?…ie。的关注我应该把这个拿去吗?

请向Breeze提交bug: github.com/scalanlp/breeze/issues

Breeze努力实现线程安全(这就是为什么我们使用ThreadLocal来为每个线程提供一个转换实例),但是这里出现了一些问题。(ThreadLocal代码对我来说看起来完全没问题,但后来我又写了。)

最新更新