Scala未来:每个新创建或映射的异常的默认错误处理程序



是否有可能总是创建一个Future{…}块和默认的onFailure处理程序?(例如写堆栈跟踪到控制台)?此处理程序还应该自动附加到映射的期货(通过在已经具有默认失败处理程序的期货上调用map创建的新期货)

更多细节请参见我的问题:Scala在Android上使用Scala .concurrent. future不报告系统err/out异常

我想有一个"最后的手段"异常日志代码,如果有人不使用onFailure或类似的东西在返回的未来

我有一个类似的问题,期货在实际结果不相关的情况下静默失败,因此没有明确处理。从ExecutionContext的文档中,我最初假设reportFailure方法是为了报告Future中的任何故障。这显然是错误的-所以这是我想出的方法来记录异常(即使是映射的或其他派生的)期货:

  • 一个LoggedFuture类,委托FutureonFailure记录异常,类似于@ limbsoup的回答
  • 对于像map这样的方法,返回一个新的Future也会产生一个LoggedFuture
  • 使用Promise作为在级联LoggedFutures之间共享的某种失败事件,即使由于传播而多次应用onFailure回调,也只记录一次异常
object LoggedFuture {
  def apply[T](future: Future[T])(implicit ec: ExecutionContext): Future[T] = {
    if (future.isInstanceOf[LoggedFuture[T]]) {
      // don't augment to prevent double logging
      future.asInstanceOf[LoggedFuture[T]]
    }
    else {
      val failEvent = promise[Unit]
      failEvent.future.onFailure {
        // do your actual logging here
        case t => t.printStackTrace()
      }
      new LoggedFuture(future, failEvent, ec)
    }
  }
}
private class LoggedFuture[T](future: Future[T], failEvent: Promise[Unit], ec: ExecutionContext) extends Future[T] {
  // fire "log event" on failure
  future.onFailure {
    // complete log event promise
    // the promise is used to log the error only once, even if the
    // future is mapped and thus further callbacks attached
    case t => failEvent.tryComplete(Failure(t))
  } (ec)
  // delegate methods
  override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = {
    future.ready(atMost)
    this
  }
  override def result(atMost: scala.concurrent.duration.Duration)(implicit permit: CanAwait): T = future.result(atMost)
  override def isCompleted: Boolean = future.isCompleted
  override def onComplete[U](func: scala.util.Try[T] => U)(implicit executor: ExecutionContext): Unit = future.onComplete(func)
  override def value: Option[Try[T]] = future.value
  // propagate LoggedFuture (and shared log event) whenever a new future is returned
  override def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] =
    new LoggedFuture(super.map(f), failEvent, executor)
  override def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] =
    new LoggedFuture(super.transform(s, f), failEvent, executor)
  override def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] =
    new LoggedFuture(super.flatMap(f), failEvent, executor)
  override def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] =
    new LoggedFuture(super.recover(pf), failEvent, executor)
  override def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] =
    new LoggedFuture(super.recoverWith(pf), failEvent, executor)
  override def zip[U](that: Future[U]): Future[(T, U)] =
    new LoggedFuture(super.zip(that), failEvent, ec)
  override def fallbackTo[U >: T](that: Future[U]): Future[U] = 
    new LoggedFuture(super.fallbackTo(that), failEvent, ec)
  override def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] = 
    new LoggedFuture(super.andThen(pf), failEvent, executor)
}
class RichFuture[T](future: Future[T]) {
  def asLogged(implicit ec: ExecutionContext): Future[T] = LoggedFuture(future)
}

此外,我有一个隐式转换到RichFuture(如上所述)的定义,所以我可以很容易地转换现有的期货调用,如future.asLogged

使用下面的隐式类,您可以轻松地记录期货的失败,同时避免recover的样板文件:

  import com.typesafe.scalalogging.Logger
  implicit class LoggingFuture[+T](val f: Future[T]) extends AnyVal {
    def withFailureLogging(l: Logger, message: String): Future[T] = f recover {
      case e =>
        l.error(s"$message: $e")
        throw e
    }
    def withPrintStackTraceOnFailure: Future[T] = f recover {
      case e =>
        e.printStackTrace()
        throw e
      }
  }

可以如下所示使用:

 import com.typesafe.scalalogging._
 import scala.language.postfixOps
 class MyClass extends LazyLogging {
   def f = Future {
     // do something that fails
     throw new Exception("this future fails")
   } withFailureLogging(logger, "some error message")
   def g = Future {
     // do something that fails
     throw new Exception("this future fails")
   } withPrintStackTraceOnFailure
 }

就像我的评论的扩展:

你没有明白,没有必要为每个映射的未来做失败回调,因为如果失败映射不会做任何计算,只需要进一步传递现有的失败。所以如果你把更多的计算链接到失败的计算上,所有新的回调都不会被调用。

考虑这个例子:

case class TestError(msg) extends Throwable(msg)
val f1 = Future { 10 / 0 }
val f2 = f1 map { x => throw new TestError("Hello"); x + 10 }
f1.onFailure {
  case error => println(error.getMessage)
}
f2.onFailure {
  case er: TestError => println("TestError")
  case _ => println("Number error")
}
// Exiting paste mode, now interpreting.
/ by zero
Number error
f1: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@54659bf8
f2: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@5ae2e211

可以看到,第一个回调打印错误消息,第二个忽略抛出的TestError。那是因为你的映射函数没有应用。如果你看一下map:

的注释
/** Creates a new future by applying a function to the successful result of
 *  this future. If this future is completed with an exception then the new
 *  future will also contain this exception.
 */

所以没有必要再附加新的失败回调,因为任何进一步的未来将简单地包含前一个的结果,对于每个您已经定义了一个回调。

最新更新