我正在尝试使用monix来并行化某些操作,然后执行错误处理
假设我正在尝试解析和验证如下几个对象
def parseAndValidateX(x: X) Task[X]
和
def parseAndValidateY(y: Y): Task[Y]
这里X和Y是我定义的一些类型。
现在,这些方法中的每一个都计算一些标准并返回一个Task。如果计算失败,我有一些形式为
的代码Task.raiseError(new CustomException("X is invalid because certain reasons a,b,c"))
我对y也有类似的任务加薪
Task.raiseError(new CustomException("Y is invalid because certain reasons a',b',c'"))
现在我有了这个类型
case class Combined(x: X, y: Y)
我定义了这个
private def parseAndValidateCombined(x: X, y: Y): Task[Combined] = {
val xTask = parseAndValidateX(x)
val yTask = parseAndValidateY(y)
Task.parMap2(xTask, yTask) {
(xEval, yEval) => SecretData(xEval, yTask)
}
}
这应该允许我并行运行验证,并且肯定我得到了响应。
但是我也想要这个行为
如果两个任务都失败,我想返回如下错误
Task.raiseError(new CustomException("X is invalid because certain reasons a,b,c and "Y is invalid because certain reasons a',b',c'"))
我似乎做不到。根据两个任务中的哪一个失败,我只能在parMap2输出的onRecover方法上获得两个失败中的一个。
如果两个都失败,我只得到任务x的错误。
是可能的,我完成我正在做什么与Monix在一个完全异步的方式(例如,也许一些其他的方法组成任务在一起)?或者我必须阻塞执行程序,单独获取错误并重新组合值?
仅使用parMap2
是不可能完成您想要做的事情的。文档说:
如果其中一个任务失败,则所有其他任务都获得取消,最终结果将是失败。
然而,有可能实现您希望暴露错误,而不是隐藏在一元处理之后。这可以通过materialize
方法实现。
private def parseAndValidateCombined[X, Y](x: X, y: Y): Task[Combined] = {
val xTask = parseAndValidate(x).materialize // turn on Task[Try[X]]
val yTask = parseAndValidate(y).materialize // turn on Task[Try[Y]]
Task.parMap2(xTask, yTask) {
case (Success(xEval), Success(yEval)) => Success(SecretData(xEval, yEval))
case (Failure(_), Failure(_)) => Failure[Combined](new CustomException(....))
case (Failure(left), _) => Failure[Combined](left)
case (_, Failure(right)) => Failure[Combined](right)
}.dematerialize // turn to Task[Combined]
}