如何处理任务异步执行期间的失败? 即至少打印堆栈跟踪并关闭。下面的代码似乎永远等待输入> 5
val things = Range(1, 40)
implicit val scheduler = monix.execution.Scheduler.global
def t(i:Int) = Task.eval {
Try{
Thread.sleep(1000)
val result = i + 1
if(result > 5){
throw new Exception("asdf")
}
// i.e. write to file, that's why unit is returned
println(result) // Effect
"Result"
}
}
val futures = things.map(e=> t(e))
futures.foreach(_.runToFuture)
编辑
尝试:
futures.foreach(_.runToFuture.onComplete {
case Success(value) =>
println(value)
case Failure(ex) =>
System.err.println(ex)
System.exit(1)
})
不会停止计算。 如何记录堆栈跟踪并取消正在进行的计算并停止?
一种更惯用的方法是使用Observable
而不是Task
,因为它正在处理数据列表(我假设这是用例,因为它显示在问题中(。
val obs = Observable
.fromIterable(Range(1, 40))
.mapEval(i =>
if (i + 1 > 5) Task.raiseError(new Exception("Error")) // will stop the stream
else Task.delay(println(i)) // Or write to file in your case
)
.completedL
.runToFuture
obs
.recover {
case NonFatal(e) => println("Error")
}
或者,您也可以使用Either
发出错误信号,从而提高类型安全性,因为您需要处理Either
结果。
val obs = Observable
.fromIterable(Range(1, 40))
.mapEval(i =>
if (i + 1 > 5) Task.pure(Left("Error"))
else Task.delay(println(i)).map(_ => Right(())) // Or write to file in your case
)
.takeWhileInclusive(_.isRight) // will also emit the failing result
.lastL
.runToFuture
obs.map {
case Left(err) => println("There's an error")
case _ => println("Completed successfully")
}
这个问题有两部分:
- 使任务可以取消。
- 当一个任务失败时取消同级。
使任务可取消
Monix 具有 BooleanCancelable,它允许您在调用cancel
时将isCancelled
的结果设置为true
。
cancel
还需要调用Thread.interrupt
以在运行时将其唤醒Thread.sleep
。 否则,sleep
将贯穿其过程。 但是,这会在您的任务中抛出中断异常。 这需要处理。
取消兄弟姐妹
有复合可取消。 这似乎是CompositeCancellable
它从父任务调用cancel
的用例。 因此,一旦构建了CompositeCancellable
(即构造了所有任务(:
- 必须为每个任务提供对此的引用,以便失败的任务可以调用
cancel
。 (请注意,这是一种循环引用,最好避免( - 或者,当子任务失败并调用
cancel
时,父任务(或代码(会意识到。 (这将避免循环引用(
通知同级任务的另一种方法是使用 AtomicBoolean 并经常检查它(睡眠 10 毫秒而不是 1000 毫秒(。 当一个任务失败时,它将设置此布尔值,以便其他任务可以停止执行。 这当然不涉及Cancellable
. (这是一种黑客,最好使用Monix调度程序(
注意
在Task
内调用Thread.sleep
是个好主意吗? 我认为这会阻止另一个任务使用该线程。 我认为使用调度程序来增加延迟并编写这些子任务是最有效的利用线程池的方法。