用回调组合两个 Scala 期货,没有第三个 ExecutionContext



我有两种方法,我们称它们为load()init()。每个线程在自己的线程中启动计算,并在自己的执行上下文中返回Future。这两个计算是独立的。

val loadContext = ExecutionContext.fromExecutor(...)
def load(): Future[Unit] = {
Future
}
val initContext = ExecutionContext.fromExecutor(...)
def init(): Future[Unit] = {
Future { ... }(initContext)
}

我想从第三个线程调用这两个线程 - 假设它来自main()- 并在两者都完成后执行其他一些计算。

def onBothComplete(): Unit = ...

现在:

  1. 我不在乎哪个先完成
  2. 我不在乎在哪个线程上执行其他计算,除了:
  3. 我不想阻止任何一个线程等待另一个线程;
  4. 我不想阻止第三个(调用)线程;和
  5. 我不想仅仅为了设置标志而启动第四个线程。

如果我使用理解,我会得到类似的东西:

val loading = load()
val initialization = initialize()
for {
loaded <- loading
initialized <- initialization
} yield { onBothComplete() }

我得到找不到隐式执行上下文。

我认为这意味着 Scala 希望第四个线程等待两个期货的完成并设置标志,要么是明确的新ExecutionContext,要么是ExecutionContext.Implicits.global。因此,似乎理解已经出来了。

我想我也许可以嵌套回调:

initialization.onComplete {
case Success(_) =>
loading.onComplete {
case Success(_) => onBothComplete()
case Failure(t) => log.error("Unable to load", t)
}
case Failure(t) => log.error("Unable to initialize", t)
}

不幸的是,onComplete也采用了隐式ExecutionContext,并且我得到了相同的错误。(这也是丑陋的,如果initialization失败,loading会丢失错误消息。

有没有办法在不阻塞和不引入另一个ExecutionContext的情况下组成 Scala 期货?如果没有,我可能不得不把它们扔给Java 8 CompletableFutures或JavaslangVavr Futures,它们都能够在完成原始工作的线程上运行回调。

更新以阐明阻止一个线程等待另一个线程也是不可接受的。

再次更新,以降低完成后计算的具体内容。

为什么不重用你自己的一个执行上下文呢?不确定你对这些的要求是什么,但如果你使用一个线程执行器,你可以重用那个作为你理解的执行上下文,你不会得到任何新的线程:

implicit val loadContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)

如果您确实无法重用它们,则可以将其视为隐式执行上下文:

implicit val currentThreadExecutionContext = ExecutionContext.fromExecutor(
(runnable: Runnable) => {
runnable.run()
})

这将在当前线程上运行期货。 但是,Scala 文档明确建议不要这样做,因为它引入了线程运行Future的非确定性(但正如您所说,您不在乎它在哪个线程上运行,因此这可能无关紧要)。

请参阅同步执行上下文,了解为什么不建议这样做。

具有该上下文的示例:

val loadContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
def load(): Future[Unit] = {
Future(println("loading thread " + Thread.currentThread().getName))(loadContext)
}
val initContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
def init(): Future[Unit] = {
Future(println("init thread " + Thread.currentThread().getName))(initContext)
}
val doneFlag = new AtomicBoolean(false)
val loading = load()
val initialization = init()
implicit val currentThreadExecutionContext = ExecutionContext.fromExecutor(
(runnable: Runnable) => {
runnable.run()
})
for {
loaded <- loading
initialized <- initialization
} yield {
println("yield thread " + Thread.currentThread().getName)
doneFlag.set(true)
}

指纹:

loading thread pool-1-thread-1
init thread pool-2-thread-1
yield thread main

尽管yield行可能会打印pool-1-thread-1pool-2-thread-1,具体取决于运行。

在 Scala 中,Future表示要异步执行的工作(即与其他工作单元同时执行)。ExecutionContext表示用于执行Future的线程池。换句话说,ExecutionContext是执行实际工作的工人团队。

为了提高效率和可扩展性,最好有大团队(例如,具有 10 个线程的单个ExecutionContext执行 10 个Future)而不是小团队(例如 5 个ExecutionContext,每个线程 2 个线程执行 10 个Future)。

在这种情况下,如果要将线程数限制为 2,则可以:

def load()(implicit teamOfWorkers: ExecutionContext): Future[Unit] = {
Future { ... } /* will use the teamOfWorkers implicitly */
}
def init()(implicit teamOfWorkers: ExecutionContext): Future[Unit] = {
Future { ... } /* will use the teamOfWorkers implicitly */
}
implicit val bigTeamOfWorkers = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))
/* All async works in the following will use 
the same bigTeamOfWorkers implicitly and works will be shared by
the 2 workers (i.e. thread) in the team  */
for {
loaded <- loading
initialized <- initialization
} yield doneFlag.set(true)

找不到隐式 ExecutionContext错误并不意味着 Scala 需要额外的线程。 这只意味着 Scala 想要一个ExecutionContext来完成这项工作。 额外的ExecutionContext并不一定意味着额外的"线程",例如以下ExecutionContext,而不是创建新线程,将在当前线程中执行工作:

val currThreadExecutor = ExecutionContext.fromExecutor(new Executor {
override def execute(command: Runnable): Unit = command.run()
})

相关内容

  • 没有找到相关文章

最新更新