Scala - 在多个线程内工作的最佳API



在Python中,我正在使用一个名为futures的库,它允许我以简洁明了的方式使用N个工作进程池来完成处理工作:

schedulerQ = []
for ... in ...:
    workParam = ...  # arguments for call to processingFunction(workParam)
    schedulerQ.append(workParam)
with futures.ProcessPoolExecutor(max_workers=5) as executor:  # 5 CPUs
    for retValue in executor.map(processingFunction, schedulerQ):
        print "Received result", retValue

(processingFunction是 CPU 绑定的,所以这里没有异步机制的意义 - 这是关于普通的旧算术计算(

我现在正在寻找最接近的方法在 Scala 中做同样的事情。请注意,在 Python 中,为了避免 GIL 问题,我使用的是进程(因此使用 ProcessPoolExecutor 而不是 ThreadPoolExecutor ( - 并且库会自动将workParam参数封送到执行processingFunction(workParam)的每个进程实例 - 它将结果封送回主进程,供执行者的map循环使用。

这适用于 Scala 和 JVM 吗?原则上,我的 processingFunction 也可以从线程执行(根本没有全局状态( - 但我有兴趣看到多处理和多线程的解决方案。

问题的关键部分是JVM世界中是否有任何东西具有像上面看到的Python futures一样清晰的API。我认为这是我见过的最好的 SMP API 之一 - 准备一个包含所有调用的函数参数的列表,然后只有两行:创建 poolExecutor,map处理函数,一旦结果由工人生成,就会取回您的结果。结果在第一次调用 processingFunction 返回时立即开始出现,并一直出现,直到它们全部完成 - 此时 for 循环结束。

与在 Scala 中使用并行集合相比,您的样板要少得多。

myParameters.par.map(x => f(x))

如果您想要默认线程数(与内核数相同(,则可以解决问题。

如果坚持设置工人数量,可以这样:

import scala.collection.parallel._
import scala.concurrent.forkjoin._
val temp = myParameters.par
temp.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(5))
temp.map(x => f(x))

返回时间的确切细节是不同的,但您可以将尽可能多的机器放入f(x)(即计算和对结果执行某些操作(,因此这可以满足您的需求。

一般来说,仅仅让结果显示为已完成是不够的;然后你需要处理它们,也许分叉它们,收集它们,等等。 如果你想这样做,Akka Streams(点击此处的链接(已经接近1.0,将有助于生成复杂的并行处理图。

有一个 Futures api,它允许您在线程池上运行工作单元(文档:http://docs.scala-lang.org/overviews/core/futures.html(,还有一个"并行集合 api",可用于对集合执行并行操作: http://docs.scala-lang.org/overviews/parallel-collections/overview.html

相关内容

  • 没有找到相关文章

最新更新