在Python中,我正在使用一个名为futures
的库,它允许我以简洁明了的方式使用N个工作进程池来完成处理工作:
schedulerQ = []
for ... in ...:
workParam = ... # arguments for call to processingFunction(workParam)
schedulerQ.append(workParam)
with futures.ProcessPoolExecutor(max_workers=5) as executor: # 5 CPUs
for retValue in executor.map(processingFunction, schedulerQ):
print "Received result", retValue
(processingFunction
是 CPU 绑定的,所以这里没有异步机制的意义 - 这是关于普通的旧算术计算(
我现在正在寻找最接近的方法在 Scala 中做同样的事情。请注意,在 Python 中,为了避免 GIL 问题,我使用的是进程(因此使用 ProcessPoolExecutor
而不是 ThreadPoolExecutor
( - 并且库会自动将workParam
参数封送到执行processingFunction(workParam)
的每个进程实例 - 它将结果封送回主进程,供执行者的map
循环使用。
这适用于 Scala 和 JVM 吗?原则上,我的 processingFunction 也可以从线程执行(根本没有全局状态( - 但我有兴趣看到多处理和多线程的解决方案。
问题的关键部分是JVM世界中是否有任何东西具有像上面看到的Python futures
一样清晰的API。我认为这是我见过的最好的 SMP API 之一 - 准备一个包含所有调用的函数参数的列表,然后只有两行:创建 poolExecutor,map
处理函数,一旦结果由工人生成,就会取回您的结果。结果在第一次调用 processingFunction
返回时立即开始出现,并一直出现,直到它们全部完成 - 此时 for 循环结束。
与在 Scala 中使用并行集合相比,您的样板要少得多。
myParameters.par.map(x => f(x))
如果您想要默认线程数(与内核数相同(,则可以解决问题。
如果坚持设置工人数量,可以这样:
import scala.collection.parallel._
import scala.concurrent.forkjoin._
val temp = myParameters.par
temp.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(5))
temp.map(x => f(x))
返回时间的确切细节是不同的,但您可以将尽可能多的机器放入f(x)
(即计算和对结果执行某些操作(,因此这可以满足您的需求。
一般来说,仅仅让结果显示为已完成是不够的;然后你需要处理它们,也许分叉它们,收集它们,等等。 如果你想这样做,Akka Streams(点击此处的链接(已经接近1.0,将有助于生成复杂的并行处理图。
有一个 Futures api,它允许您在线程池上运行工作单元(文档:http://docs.scala-lang.org/overviews/core/futures.html(,还有一个"并行集合 api",可用于对集合执行并行操作: http://docs.scala-lang.org/overviews/parallel-collections/overview.html