>情况:
有许多阻塞同步调用(这是一个无法更改的给定调用(,这可能需要很长时间,需要聚合结果。
目标:
使呼叫非阻塞,然后等待最大时间 (ms( 并收集所有成功的呼叫,即使有些呼叫可能因超时而失败(因此我们可以降低失败呼叫的功能(。
当前解决方案:
下面的解决方案通过组合期货来工作,等待该期货完成或超时,在非致命错误(超时(的情况下,它使用 completedFutureValues
方法来提取成功完成的期货。
import scala.concurrent.{Await, Future}
import scala.util.Random._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
def potentialLongBlockingHelloWorld(i: Int): String = {Thread.sleep(nextInt(500)); s"hello world $i" }
// use the same method 3 times, but in reality is different methods (with different types)
val futureHelloWorld1 = Future(potentialLongBlockingHelloWorld(1))
val futureHelloWorld2 = Future(potentialLongBlockingHelloWorld(2))
val futureHelloWorld3 = Future(potentialLongBlockingHelloWorld(3))
val combinedFuture: Future[(String, String, String)] = for {
hw1 <- futureHelloWorld1
hw2 <- futureHelloWorld2
hw3 <- futureHelloWorld3
} yield (hw1, hw2, hw3)
val res = try {
Await.result(combinedFuture, 250.milliseconds)
} catch {
case NonFatal(_) => {
(
completedFutureValue(futureHelloWorld1, "fallback hello world 1"),
completedFutureValue(futureHelloWorld2, "fallback hello world 2"),
completedFutureValue(futureHelloWorld3, "fallback hello world 3")
)
}
}
def completedFutureValue[T](future: Future[T], fallback: T): T =
future.value match {
case Some(Success(value)) => value
case Some(Failure(e)) =>
fallback
case None =>
fallback
}
它将返回 Tuple3,其中包含已完成的未来结果或回退,例如: (hello world,fallback hello world 2,fallback hello world 3)
虽然这有效,但我对此并不特别满意。
问题:
我们如何改进这一点?
如果我也可以建议一种方法。想法是避免一起阻塞,并实际上为每个未来设置超时。这是一篇博客文章,我在做我的例子时发现非常有用,它有点旧,但黄金的东西:
https://nami.me/2015/01/20/scala-futures-with-timeout/
其中一个缺点是您可能需要将 akka 添加到解决方案中,但话又说回来,它并不完全丑陋:
import akka.actor.ActorSystem
import akka.pattern.after
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{FiniteDuration, _}
import scala.concurrent.{Await, Future}
import scala.util.Random._
implicit val system = ActorSystem("theSystem")
implicit class FutureExtensions[T](f: Future[T]) {
def withTimeout(timeout: => Throwable)(implicit duration: FiniteDuration, system: ActorSystem): Future[T] = {
Future firstCompletedOf Seq(f, after(duration, system.scheduler)(Future.failed(timeout)))
}
}
def potentialLongBlockingHelloWorld(i: Int): String = {
Thread.sleep(nextInt(500)); s"hello world $i"
}
implicit val timeout: FiniteDuration = 250.milliseconds
val timeoutException = new TimeoutException("Future timed out!")
// use the same method 3 times, but in reality is different methods (with different types)
val futureHelloWorld1 = Future(potentialLongBlockingHelloWorld(1)).withTimeout(timeoutException).recoverWith { case _ ⇒ Future.successful("fallback hello world 1") }
val futureHelloWorld2 = Future(potentialLongBlockingHelloWorld(2)).withTimeout(timeoutException).recoverWith { case _ ⇒ Future.successful("fallback hello world 2") }
val futureHelloWorld3 = Future(potentialLongBlockingHelloWorld(3)).withTimeout(timeoutException).recoverWith { case _ ⇒ Future.successful("fallback hello world 3") }
val results = Seq(futureHelloWorld1, futureHelloWorld2, futureHelloWorld3)
val combinedFuture = Future.sequence(results)
// this is just to show what you would have in your future
// combinedFuture is not blocking anything
val justToShow = Await.result(combinedFuture, 1.seconds)
println(justToShow)
// some of my runs:
// List(hello world 1, hello world 2, fallback hello world 3)
// List(fallback hello world 1, fallback hello world 2, hello world 3)
使用这种方法,没有阻塞,并且每个阶段都有超时,因此您可以微调和适应您真正需要的内容。我使用的等待只是为了展示它是如何工作的。
在此处发布同事提供的解决方案,该解决方案基本上与问题中提供的解决方案相同,但使其更加干净。
使用他的解决方案可以写:
(
Recoverable(futureHelloWorld1, "fallback hello world 1"),
Recoverable(futureHelloWorld2, "fallback hello world 1"),
Recoverable(futureHelloWorld3, "fallback hello world 1")
).fallbackAfter(250.milliseconds) {
case (hw1, hw2, hw3) =>
// Do something with the results.
println(hw1.value)
println(hw2.value)
println(hw3.value)
}
这使用带有回退的期货元组。使这成为可能的代码:
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException}
import scala.util.Try
import scala.util.control.NonFatal
sealed abstract class FallbackFuture[T] private(private val future: Future[T]) {
def value: T
}
object FallbackFuture {
final case class Recoverable[T](future: Future[T], fallback: T) extends FallbackFuture[T](future) {
override def value: T = {
if (future.isCompleted) future.value.flatMap(t => t.toOption).getOrElse(fallback)
else fallback
}
}
object Recoverable {
def apply[T](fun: => T, fallback: T)(implicit ec: ExecutionContext): FallbackFuture[T] = {
new Recoverable[T](Future(fun), fallback)
}
}
final case class Irrecoverable[T](future: Future[T]) extends FallbackFuture[T](future) {
override def value: T = {
def except = throw new IllegalAccessException("Required future did not compelete before timeout")
if (future.isCompleted) future.value.flatMap(_.toOption).getOrElse(except)
else except
}
}
object Irrecoverable {
def apply[T](fun: => T)(implicit ec: ExecutionContext): FallbackFuture[T] = {
new Irrecoverable[T](Future(fun))
}
}
object Implicits {
private val logger = LoggerFactory.getLogger(Implicits.getClass)
type FF[X] = FallbackFuture[X]
implicit class Tuple2Ops[V1, V2](t: (FF[V1], FF[V2])) {
def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2])) => R): R =
awaitAll(timeout, t) {
fn(t)
}
}
implicit class Tuple3Ops[V1, V2, V3](t: (FF[V1], FF[V2], FF[V3])) {
def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3])) => R): R =
awaitAll(timeout, t) {
fn(t)
}
}
implicit class Tuple4Ops[V1, V2, V3, V4](t: (FF[V1], FF[V2], FF[V3], FF[V4])) {
def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4])) => R): R =
awaitAll(timeout, t) {
fn(t)
}
}
implicit class Tuple5Ops[V1, V2, V3, V4, V5](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5])) {
def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5])) => R): R =
awaitAll(timeout, t) {
fn(t)
}
}
implicit class Tuple6Ops[V1, V2, V3, V4, V5, V6](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6])) {
def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6])) => R): R =
awaitAll(timeout, t) {
fn(t)
}
}
implicit class Tuple7Ops[V1, V2, V3, V4, V5, V6, V7](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7])) {
def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7])) => R): R =
awaitAll(timeout, t) {
fn(t)
}
}
implicit class Tuple8Ops[V1, V2, V3, V4, V5, V6, V7, V8](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8])) {
def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8])) => R): R =
awaitAll(timeout, t) {
fn(t)
}
}
implicit class Tuple9Ops[V1, V2, V3, V4, V5, V6, V7, V8, V9](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8], FF[V9])) {
def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8], FF[V9])) => R): R =
awaitAll(timeout, t) {
fn(t)
}
}
implicit class Tuple10Ops[V1, V2, V3, V4, V5, V6, V7, V8, V9, V10](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8], FF[V9], FF[V10])) {
def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8], FF[V9], FF[V10])) => R): R =
awaitAll(timeout, t) {
fn(t)
}
}
private implicit def toFutures(fallbackFuturesTuple: Product): Seq[Future[Any]] = {
fallbackFuturesTuple.productIterator.toList
.map(_.asInstanceOf[FallbackFuture[Any]])
.map(_.future)
}
private def awaitAll[R](timeout: Duration, futureSeq: Seq[Future[Any]])(fn: => R) = {
Try {
Await.ready(Future.sequence(futureSeq), timeout)
} recover {
case _: TimeoutException => logger.warn("Call timed out")
case NonFatal(ex) => throw ex
}
fn
}
}
}
可能最好使用 Future.sequence((,它从
一旦(据我了解(您无论如何都要阻止当前线程并同步等待结果,我会说最简单的解决方案应该是:
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{Await, Future}
import scala.util.Random._
import scala.concurrent.ExecutionContext.Implicits.global
def potentialLongBlockingHelloWorld(i: Int): String = {Thread.sleep(nextInt(500)); s"hello world $i" }
// init with fallback
val result1 = new AtomicReference[String]("fallback hello world 1")
val result2 = new AtomicReference[String]("fallback hello world 2")
val result3 = new AtomicReference[String]("fallback hello world 3")
// use the same method 3 times, but in reality is different methods (with different types)
val f1 = Future(potentialLongBlockingHelloWorld(1)).map {res =>
result1.set(res)
}
val f2 = Future(potentialLongBlockingHelloWorld(2)).map {res =>
result2.set(res)
}
val f3 = Future(potentialLongBlockingHelloWorld(3)).map {res =>
result1.set(res)
}
for (i <- 1 to 5 if !(f1.isCompleted && f2.isCompleted && f3.isCompleted)) {
Thread.sleep(50)
}
(result1.get(), result2.get(), result3.get())
在这里,您只需在 AtomicReferences 中引入结果,这些结果在未来完成时更新,并使用即时报价检查所有期货已完成或最多 250 毫秒(超时(的即时报价的结果。
或者,您可以从这里获得Future with timeout
实现,扩展具有回退和超时,而不仅仅是将Future.sequence
与 Await 一起使用,并保证所有Futures
都将及时完成成功或回退。
为什么不写:
val futures: f1 :: f2 :: f3 :: Nil
val results = futures map { f =>
Await.result(f, yourTimeOut)
}
results.collect {
case Success => /* your logic */
}
???