是否有可能总是创建一个Future{…}块和默认的onFailure处理程序?(例如写堆栈跟踪到控制台)?此处理程序还应该自动附加到映射的期货(通过在已经具有默认失败处理程序的期货上调用map创建的新期货)
更多细节请参见我的问题:Scala在Android上使用Scala .concurrent. future不报告系统err/out异常
我想有一个"最后的手段"异常日志代码,如果有人不使用onFailure或类似的东西在返回的未来
我有一个类似的问题,期货在实际结果不相关的情况下静默失败,因此没有明确处理。从ExecutionContext
的文档中,我最初假设reportFailure
方法是为了报告Future
中的任何故障。这显然是错误的-所以这是我想出的方法来记录异常(即使是映射的或其他派生的)期货:
- 一个
LoggedFuture
类,委托Future
和onFailure
记录异常,类似于@ limbsoup的回答 - 对于像
map
这样的方法,返回一个新的Future
也会产生一个LoggedFuture
- 使用
Promise
作为在级联LoggedFutures
之间共享的某种失败事件,即使由于传播而多次应用onFailure回调,也只记录一次异常
object LoggedFuture {
def apply[T](future: Future[T])(implicit ec: ExecutionContext): Future[T] = {
if (future.isInstanceOf[LoggedFuture[T]]) {
// don't augment to prevent double logging
future.asInstanceOf[LoggedFuture[T]]
}
else {
val failEvent = promise[Unit]
failEvent.future.onFailure {
// do your actual logging here
case t => t.printStackTrace()
}
new LoggedFuture(future, failEvent, ec)
}
}
}
private class LoggedFuture[T](future: Future[T], failEvent: Promise[Unit], ec: ExecutionContext) extends Future[T] {
// fire "log event" on failure
future.onFailure {
// complete log event promise
// the promise is used to log the error only once, even if the
// future is mapped and thus further callbacks attached
case t => failEvent.tryComplete(Failure(t))
} (ec)
// delegate methods
override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = {
future.ready(atMost)
this
}
override def result(atMost: scala.concurrent.duration.Duration)(implicit permit: CanAwait): T = future.result(atMost)
override def isCompleted: Boolean = future.isCompleted
override def onComplete[U](func: scala.util.Try[T] => U)(implicit executor: ExecutionContext): Unit = future.onComplete(func)
override def value: Option[Try[T]] = future.value
// propagate LoggedFuture (and shared log event) whenever a new future is returned
override def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] =
new LoggedFuture(super.map(f), failEvent, executor)
override def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] =
new LoggedFuture(super.transform(s, f), failEvent, executor)
override def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] =
new LoggedFuture(super.flatMap(f), failEvent, executor)
override def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] =
new LoggedFuture(super.recover(pf), failEvent, executor)
override def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] =
new LoggedFuture(super.recoverWith(pf), failEvent, executor)
override def zip[U](that: Future[U]): Future[(T, U)] =
new LoggedFuture(super.zip(that), failEvent, ec)
override def fallbackTo[U >: T](that: Future[U]): Future[U] =
new LoggedFuture(super.fallbackTo(that), failEvent, ec)
override def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] =
new LoggedFuture(super.andThen(pf), failEvent, executor)
}
class RichFuture[T](future: Future[T]) {
def asLogged(implicit ec: ExecutionContext): Future[T] = LoggedFuture(future)
}
此外,我有一个隐式转换到RichFuture
(如上所述)的定义,所以我可以很容易地转换现有的期货调用,如future.asLogged
。
使用下面的隐式类,您可以轻松地记录期货的失败,同时避免recover
的样板文件:
import com.typesafe.scalalogging.Logger
implicit class LoggingFuture[+T](val f: Future[T]) extends AnyVal {
def withFailureLogging(l: Logger, message: String): Future[T] = f recover {
case e =>
l.error(s"$message: $e")
throw e
}
def withPrintStackTraceOnFailure: Future[T] = f recover {
case e =>
e.printStackTrace()
throw e
}
}
可以如下所示使用:
import com.typesafe.scalalogging._
import scala.language.postfixOps
class MyClass extends LazyLogging {
def f = Future {
// do something that fails
throw new Exception("this future fails")
} withFailureLogging(logger, "some error message")
def g = Future {
// do something that fails
throw new Exception("this future fails")
} withPrintStackTraceOnFailure
}
就像我的评论的扩展:
你没有明白,没有必要为每个映射的未来做失败回调,因为如果失败映射不会做任何计算,只需要进一步传递现有的失败。所以如果你把更多的计算链接到失败的计算上,所有新的回调都不会被调用。
考虑这个例子:
case class TestError(msg) extends Throwable(msg)
val f1 = Future { 10 / 0 }
val f2 = f1 map { x => throw new TestError("Hello"); x + 10 }
f1.onFailure {
case error => println(error.getMessage)
}
f2.onFailure {
case er: TestError => println("TestError")
case _ => println("Number error")
}
// Exiting paste mode, now interpreting.
/ by zero
Number error
f1: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@54659bf8
f2: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@5ae2e211
可以看到,第一个回调打印错误消息,第二个忽略抛出的TestError
。那是因为你的映射函数没有应用。如果你看一下map
:
/** Creates a new future by applying a function to the successful result of
* this future. If this future is completed with an exception then the new
* future will also contain this exception.
*/
所以没有必要再附加新的失败回调,因为任何进一步的未来将简单地包含前一个的结果,对于每个您已经定义了一个回调。