如何从持久石英作业中调度 scala 未来



假设有一些scala代码应该在java石英库的帮助下调度。 我们需要将此代码执行的结果存储在作业上下文中,以便在下一次作业执行中访问此结果。 对于合成示例,有一些具有inc函数的CounterService应该被调度:

trait CounterService {
def inc(): Int
}

以下石英作业调用inc并将其结果成功存储在JobDataMap中:

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
class CounterJob extends Job {
val counterService: CounterService = ...
override def execute(context: JobExecutionContext): Unit = {
val newCounterValue: Int = counterService.inc()
val map = context.getJobDetail.getJobDataMap
map.put("counter", newCounterValue)  
}
}

我们可以随时在其他地方获得作业结果(如果我们参考了scheduler(:

val scheduler: Scheduler = ...
// gets details of our CounterJob which was created and registered in the scheduler
// by the name "counter-job" (it is not shown in our example)
val job = scheduler.getJobDetail(JobKey.jobKey("counter-job")) 
// this map will contain the job result which was stored by the key "counter"
val map = job.getJobDataMap.asScala 

但是,如果我们想从石英作业执行异步代码,则此方法不起作用。 例如,假设我们的柜台服务如下:

trait AsyncCounterService {
def asyncInc(): Future[Int]
}

我们可以尝试通过以下方式执行我们的工作。但它无法正常工作,因为 方法CounterJob.execute可以早于asyncCounterService.asyncInc执行。 并且我们无法将asyncInc的结果存储在JobDataMap中:

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
class CounterJob extends Job {
val counterService: AsyncCounterService = ...  
val execContext: ExecutionContext = ...
override def execute(context: JobExecutionContext): Unit = {
// # 1: we can not influence on the execution flow of this future 
//      from job scheduler.
val counterFuture: Future[Int] = counterService.asyncInc() 
counterFuture.map { counterValue: Int =>
val map = context.getJobDetail.getJobDataMap  
// #2: this action won't have any effect
map.put("counter", counterValue)              
}
}
}

此解决方案至少有两个问题在上面的代码中标记为#1 ...#2 ...注释。

有没有更好的做法来解决这个问题? 换句话说,如何在JobDetailData图中存储Future's结果的情况下,从持久石英作业中调度 scalaFuture

如果CounterJob之后的所有内容都需要具有CounterService值,那么在CounterJob中阻止并等待未来是可以的。无论如何,在这段时间内无法执行任何操作,因为尚未计算该值。

import scala.concurrent.{Await,Future}
...
try {
val counterValue  = Await.result(counterFuture, 5.seconds)
map.put("counter", counterValue)       
} catch {
case t: TimeoutException => ...
case t: Exception => ...
}

如果您在该作业中有多个异步期货,则可以将它们与for comprehensionflatMap, map操作的一元链或来自Future配套对象的静态辅助方法组合在一起,例如Future.sequence然后最终结果将是一个将所有异步操作组合在一起的未来,您可以使用Await等待。

通常,等待期货被认为是一种不好的做法。 因为这会阻止执行器线程在等待将来完成时执行任何其他操作。

但是,在这里,您将另一个作业调度框架与另一个并发范例混合在一起。如上所述,在特定示例中,阻止是可以的,因为后面的所有内容都依赖于第一次计算。

如果其他作业可以同时运行,则有多种方法可以解决此问题:

  1. 有一种方法可以从工作中返回未来。 然后,您可以等待此将来完成,然后再安排依赖 工作。
  2. 作业
  3. 中有某种自定义事件侦听器机制,可以从作业触发。counterFuture.map {context.notify("computationReady")}
  4. 有特定的AsyncJob支持非阻塞io,它期望java Future作为回报。然后你可以将Scala的未来转换为Java的未来。

最新更新