Scala未来序列和超时处理



有一些很好的提示如何将期货与超时相结合。然而,我很好奇如何使用期货的期货序列

我的第一种方法看起来像

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits._
object FutureSequenceScala extends App {
  println("Creating futureList")
  val timeout = 2 seconds
  val futures = List(1000, 1500, 1200, 800, 2000) map { ms =>
    val f = future {
      Thread sleep ms
      ms toString
    }
    Future firstCompletedOf Seq(f, fallback(timeout))
  }
  println("Creating waitinglist")
  val waitingList = Future sequence futures
  println("Created")
  val results = Await result (waitingList, timeout * futures.size)
  println(results)
  def fallback(timeout: Duration) = future {
    Thread sleep (timeout toMillis)
    "-1"
  }
}

有没有更好的方法来处理未来序列中的超时,或者这是一个有效的解决方案?

您的代码中有一些内容可能需要重新考虑。对于初学者来说,我不太喜欢将任务提交到ExecutionContext中,这些任务的唯一目的是模拟超时,并在其中使用Thread.sleepsleep调用是阻塞的,您可能希望避免在执行上下文中有一个任务纯粹是为了等待固定的时间而阻塞的。我将在这里窃取我的答案,并建议对于纯超时处理,您应该使用我在该答案中概述的内容。HashedWheelTimer是一种高效的定时器实现,它比只休眠的任务更适合超时处理。

现在,如果你走这条路,我建议的下一个改变是处理每个未来与超时相关的故障。如果希望单个失败使sequence调用返回的聚合Future完全失败,则不执行任何额外操作。如果您不希望发生这种情况,而是希望超时返回一些默认值,那么您可以在Future上使用recover,如下所示:

withTimeout(someFuture).recover{
  case ex:TimeoutException => someDefaultValue
}

一旦你做到了这一点,你就可以利用非阻塞回调来做这样的事情:

waitingList onComplete{
  case Success(results) => //handle success
  case Failure(ex) => //handle fail
}

每个未来都有一个超时,因此不会无限运行。不需要IMO在那里进行阻塞并且经由atMost参数向Await.result提供额外的超时处理层。但我想这是假设你可以接受非阻塞方法。如果你真的需要封锁那里,那么你不应该等待timeout * futures.size那么长的时间。这些未来是平行运行的;那里的超时应该只需要与期货本身的单个超时一样长(或者稍微长一点,以考虑cpu/计时中的任何延迟)。当然不应该是超时*期货的总数。

这里有一个版本显示了您的阻塞fallback有多糟糕。

请注意,执行器是单线程的,并且您正在创建许多回退。

@cmbaxter是对的,您的主超时不应该是timeout * futures.size,它应该更大!

@cmbaxter认为非阻塞也是正确的。一旦你这样做了,并且你想强制超时,那么你将为此选择一个计时器组件,查看他的链接答案(也与你的链接答案链接)。

也就是说,我仍然喜欢你链接中的答案,因为坐在一个循环中等待下一件应该超时的事情真的很简单。

它只需要一个期货及其超时的列表和一个回退值。

也许有一个用例,比如一个简单的应用程序,它只阻止一些结果(比如你的测试),并且在结果出来之前不能退出

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
import java.lang.System.{ nanoTime => now }
object Test extends App { 
  //implicit val xc = ExecutionContext.global
  implicit val xc = ExecutionContext fromExecutorService (Executors.newSingleThreadExecutor)
  def timed[A](body: =>A): A = {
    val start = now 
    val res = body
    val end = now
    Console println (Duration fromNanos end-start).toMillis + " " + res
    res
  }
  println("Creating futureList")
  val timeout = 1500 millis
  val futures = List(1000, 1500, 1200, 800, 2000) map { ms =>
    val f = future {
      timed {
        blocking(Thread sleep ms)
        ms toString
      }
    } 
    Future firstCompletedOf Seq(f, fallback(timeout))
  }   
  println("Creating waitinglist")
  val waitingList = Future sequence futures
  println("Created")
  timed {
  val results = Await result (waitingList, 2 * timeout * futures.size)
  println(results)
  }     
  xc.shutdown
  def fallback(timeout: Duration) = future {
    timed {
      blocking(Thread sleep (timeout toMillis))
      "-1"
    }
  }   
}   

发生了什么:

Creating futureList
Creating waitinglist
Created
1001 1000
1500 -1
1500 1500
1500 -1
1200 1200
1500 -1
800 800
1500 -1
2000 2000
1500 -1
List(1000, 1500, 1200, 800, 2000)
14007 ()

Monix任务具有超时支持:

  import monix.execution.Scheduler.Implicits.global
  import monix.eval._
  import scala.concurrent.duration._
  println("Creating futureList")
  val tasks = List(1000, 1500, 1200, 800, 2000).map{ ms =>
    Task {
      Thread.sleep(ms)
      ms.toString
    }.timeoutTo(2.seconds, Task.now("-1"))
  }
  println("Creating waitinglist")
  val waitingList = Task.gather(tasks) // Task.sequence is true/literally "sequencing" operation
  println("Created")
  val results = Await.result(waitingList, timeout * futures.size)
  println(results)

相关内容

  • 没有找到相关文章

最新更新