如何设置Scala2.10并行集合的默认线程数



在2.10之前的Scala中,我可以在defaultForkJoinPool中设置并行度(就像在这个答案中Scala并行集合的并行度)。在Scala2.10中,那个API已经不存在了。我们可以在单个集合上设置并行性,这一点已经有充分的证明(http://docs.scala-lang.org/overviews/parallel-collections/configuration.html)通过分配给其taskSupport属性。

然而,我在整个代码库中使用并行集合,并且不想在每个集合实例化中添加额外的两行。有没有办法配置全局默认线程池大小,以便someCollection.par.map(f(_))自动使用默认线程数?

我知道这个问题已经一个多月了,但我刚刚遇到了完全相同的问题。谷歌搜索没有帮助,在新的API中我找不到任何看起来理智的东西。

按照这里的建议设置-Dscala.concurrent.context.maxThreads=n:在Scala2.10中为所有集合设置并行级别?似乎根本没有效果,但我不确定我是否正确使用了它(我在没有显式安装"scala"的环境中使用"java"运行应用程序,这可能是原因)。

我不知道为什么scala人员从适当的包对象中删除了这个重要的setter。

然而,通常可以使用反射来处理不完整/奇怪的界面:

def setParallelismGlobally(numThreads: Int): Unit = {
  val parPkgObj = scala.collection.parallel.`package`
  val defaultTaskSupportField = parPkgObj.getClass.getDeclaredFields.find{
    _.getName == "defaultTaskSupport"
  }.get
  defaultTaskSupportField.setAccessible(true)
  defaultTaskSupportField.set(
    parPkgObj, 
    new scala.collection.parallel.ForkJoinTaskSupport(
      new scala.concurrent.forkjoin.ForkJoinPool(numThreads)
    ) 
  )
}

对于那些不熟悉Scala更模糊功能的人,这里有一个简短的解释:

scala.collection.parallel.`package`

使用defaultTaskSupport变量访问包对象(它看起来有点像Java的静态变量,但实际上是包对象的成员变量)。由于package是一个保留关键字,因此标识符需要回溯标记。然后我们得到我们想要的私有最终字段(getField("defaultTaskSupport")由于某种原因不起作用?…),告诉它是可访问的以便能够修改它,然后用我们自己的ForkJoinTaskSupport替换它的值。

我还不了解创建并行集合的确切机制,但Combiner特性的源代码表明,defaultTaskSupport的值应该以某种方式渗透到并行集合中。

请注意,这个问题在性质上与一个更老的问题相同:"我的代码库中到处都是Math.random(),我如何将种子设置为一个固定的数字来进行调试?"。在这两种情况下,我们都有某种全局"静态"变量,我们在一百万个不同的地方隐式使用,我们想更改它,但这个变量没有setter=>我们使用反射。

丑得像地狱,但似乎工作得很好。如果需要限制线程总数,请不要忘记垃圾收集器在单独的线程上运行。

最新更新