当我们试图从Actor的接收方法中启动许多未来时,我们观察到了一种奇怪的行为。 如果我们使用我们配置的调度程序作为 ExecutionContext,则期货在同一线程上按顺序运行。如果我们使用 ExecutionContext.Implicits.global,则期货将按预期并行运行。
我们将代码归结为以下示例(更完整的示例如下):
implicit val ec = context.getDispatcher
Future{ doWork() } // <-- all running parallel
Future{ doWork() }
Future{ doWork() }
Future{ doWork() }
Future {
Future{ doWork() }
Future{ doWork() } // <-- NOT RUNNING PARALLEL!!! WHY!!!
Future{ doWork() }
Future{ doWork() }
}
一个可编译的示例如下所示:
import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}
object WhyNotParallelExperiment extends App {
val actorSystem = ActorSystem(s"Experimental")
// Futures not started in future: running in parallel
startFutures(runInFuture = false)(actorSystem.dispatcher)
Thread.sleep(5000)
// Futures started in future: running in sequentially. Why????
startFutures(runInFuture = true)(actorSystem.dispatcher)
Thread.sleep(5000)
actorSystem.terminate()
private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
if (runInFuture) {
Future{
println(s"Start Futures on thread ${Thread.currentThread().getName()}")
(1 to 9).foreach(startFuture)
println(s"Started Futures on thread ${Thread.currentThread().getName()}")
}
} else {
(11 to 19).foreach(startFuture)
}
}
private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
Thread.sleep(500)
println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
}
}
我们尝试了线程池执行器和fork-join-executor,结果相同。
我们是否以错误的方式使用期货? 那么应该如何生成并行任务?
来自对 Akka 内部BatchingExecutor
的描述(强调我的):
Mixin Featureit for Executor,它将多个嵌套
Runnable.run()
调用分组到传递给原始执行器的单个 Runnable 中。这可能是一个有用的优化,因为它绕过了原始上下文的任务队列,并将相关(嵌套)代码保留在单个线程上,这可能会提高 CPU 关联性。但是,如果传递给执行程序的任务阻塞或成本高昂,则此优化可以防止窃取工作并使性能下降。如果代码在应该使用scala.concurrent.blocking
时不使用,批处理执行程序可能会创建死锁,因为在其他任务中创建的任务将在外部任务完成时阻塞。
如果您使用的是混合BatchingExecutor
的调度程序(即MessageDispatcher
子类),则可以使用scala.concurrent.blocking
构造来启用与嵌套期货的并行性:
Future {
Future {
blocking {
doBlockingWork()
}
}
}
在您的示例中,您将在startFuture
方法中添加blocking
:
private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future {
blocking {
println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
Thread.sleep(500)
println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
}
}
运行startFutures(true)(actorSystem.dispatcher)
的输出示例,其中包含上述更改:
Start Futures on thread Experimental-akka.actor.default-dispatcher-2
Started Futures on thread Experimental-akka.actor.default-dispatcher-2
Future 1 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 3 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 5 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 7 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 4 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 9 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 6 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 8 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 2 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 1 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 3 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 5 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 4 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 8 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 7 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 9 finished on thread Experimental-akka.actor.default-dispatcher-10
Future 6 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 2 finished on thread Experimental-akka.actor.default-dispatcher-4
它与调度程序的"吞吐量"设置有关。我在 application.conf 中添加了一个"公平调度程序"来演示这一点:
fair-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 1
}
下面是您的示例,其中包含一些修改,以使用期货公平调度程序并打印吞吐量设置的当前值:
package com.test
import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}
object WhyNotParallelExperiment extends App {
val actorSystem = ActorSystem(s"Experimental")
println("Default dispatcher throughput:")
println(actorSystem.dispatchers.defaultDispatcherConfig.getInt("throughput"))
println("Fair dispatcher throughput:")
println(actorSystem.dispatchers.lookup("fair-dispatcher").configurator.config.getInt("throughput"))
// Futures not started in future: running in parallel
startFutures(runInFuture = false)(actorSystem.dispatcher)
Thread.sleep(5000)
// Futures started in future: running in sequentially. Why????
startFutures(runInFuture = true)(actorSystem.dispatcher)
Thread.sleep(5000)
actorSystem.terminate()
private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
if (runInFuture) {
Future{
implicit val fairExecutionContext = actorSystem.dispatchers.lookup("fair-dispatcher")
println(s"Start Futures on thread ${Thread.currentThread().getName()}")
(1 to 9).foreach(i => startFuture(i)(fairExecutionContext))
println(s"Started Futures on thread ${Thread.currentThread().getName()}")
}
} else {
(11 to 19).foreach(startFuture)
}
}
private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
Thread.sleep(500)
println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
}
}
输出:
Default dispatcher throughput:
5
Fair dispatcher throughput:
1
Future 12 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 11 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 13 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 14 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 16 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 15 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 17 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 18 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 19 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 13 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 11 finished on thread Experimental-akka.actor.default-dispatcher-4
Future 12 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 14 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 16 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 15 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 17 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 18 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 19 finished on thread Experimental-akka.actor.default-dispatcher-10
Start Futures on thread Experimental-akka.actor.default-dispatcher-10
Future 1 should run for 500 millis on thread Experimental-fair-dispatcher-12
Future 2 should run for 500 millis on thread Experimental-fair-dispatcher-13
Future 4 should run for 500 millis on thread Experimental-fair-dispatcher-15
Future 3 should run for 500 millis on thread Experimental-fair-dispatcher-14
Future 5 should run for 500 millis on thread Experimental-fair-dispatcher-17
Future 6 should run for 500 millis on thread Experimental-fair-dispatcher-16
Future 7 should run for 500 millis on thread Experimental-fair-dispatcher-18
Future 8 should run for 500 millis on thread Experimental-fair-dispatcher-19
Started Futures on thread Experimental-akka.actor.default-dispatcher-10
Future 4 finished on thread Experimental-fair-dispatcher-15
Future 2 finished on thread Experimental-fair-dispatcher-13
Future 1 finished on thread Experimental-fair-dispatcher-12
Future 9 should run for 500 millis on thread Experimental-fair-dispatcher-15
Future 5 finished on thread Experimental-fair-dispatcher-17
Future 7 finished on thread Experimental-fair-dispatcher-18
Future 8 finished on thread Experimental-fair-dispatcher-19
Future 6 finished on thread Experimental-fair-dispatcher-16
Future 3 finished on thread Experimental-fair-dispatcher-14
Future 9 finished on thread Experimental-fair-dispatcher-15
如您所见,公平调度程序在大多数期货中使用不同的线程。
默认调度程序针对参与者进行了优化,因此吞吐量设置为 5 以最大程度地减少上下文切换,从而提高消息处理吞吐量,同时保持一定程度的公平性。
我的公平调度程序的唯一变化是吞吐量:1,即如果可能的话,每个异步执行请求都有自己的线程(最高可达并行度-max)。
我建议为用于不同目的的期货创建单独的调度程序。 例如,一个调度程序(即线程池)用于调用某些 Web 服务,另一个用于阻止数据库访问等。这将使您能够通过调整自定义调度程序设置来更精确地控制它。
看看 https://doc.akka.io/docs/akka/current/dispatchers.html,它对于理解细节真的很有用。
另请查看 Akka 参考设置(特别是默认调度程序),那里有很多有用的注释:https://github.com/akka/akka/blob/master/akka-actor/src/main/resources/reference.conf
经过一番研究,我发现Dispatcher
类意味着akka.dispatch.BatchingExecutor
。出于性能原因,此类检查应在同一线程上批处理哪些任务。Future.map
内部创建一个在BatchingExecutor
中批处理的scala.concurrent.OnCompleteRunnable
。
对于一个任务生成一个后续任务的map()
/flatMap()
来说,这似乎是合理的,但对于用于分叉工作的显式新期货来说则不合理。 在内部,Future.apply
由Future.successful().map
实现,因此是批处理的。我现在的解决方法是以不同的方式创建期货:
object MyFuture {
def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
val promise = Promise[T]()
class FuturesStarter extends Runnable {
override def run(): Unit = {
promise.complete(Try(body))
}
}
executor.execute(new FuturesStarter)
promise.future
}
}
FutureStarter
-Runnables 不是批处理的,因此是并行运行的。
任何人都可以确认此解决方案没问题吗? 有没有更好的方法来解决这个问题? 当前需要Future
/BatchingExecutor
的实现,还是错误?