Monix并行任务的错误处理(使用parMap)



我正在尝试使用monix来并行化某些操作,然后执行错误处理

假设我正在尝试解析和验证如下几个对象

def parseAndValidateX(x: X) Task[X]

def parseAndValidateY(y: Y): Task[Y]

这里X和Y是我定义的一些类型。

现在,这些方法中的每一个都计算一些标准并返回一个Task。如果计算失败,我有一些形式为

的代码
Task.raiseError(new CustomException("X is invalid because certain reasons a,b,c"))

我对y也有类似的任务加薪

Task.raiseError(new CustomException("Y is invalid because certain reasons a',b',c'"))

现在我有了这个类型

case class Combined(x: X, y: Y)

我定义了这个

private def parseAndValidateCombined(x: X, y: Y): Task[Combined] = {
val xTask = parseAndValidateX(x)
val yTask = parseAndValidateY(y)

Task.parMap2(xTask, yTask) {
(xEval, yEval) => SecretData(xEval, yTask)
}
}

这应该允许我并行运行验证,并且肯定我得到了响应。

但是我也想要这个行为

如果两个任务都失败,我想返回如下错误

Task.raiseError(new CustomException("X is invalid because certain reasons a,b,c and "Y is invalid because certain reasons a',b',c'"))

我似乎做不到。根据两个任务中的哪一个失败,我只能在parMap2输出的onRecover方法上获得两个失败中的一个。

如果两个都失败,我只得到任务x的错误。

是可能的,我完成我正在做什么与Monix在一个完全异步的方式(例如,也许一些其他的方法组成任务在一起)?或者我必须阻塞执行程序,单独获取错误并重新组合值?

仅使用parMap2是不可能完成您想要做的事情的。文档说:

如果其中一个任务失败,则所有其他任务都获得取消,最终结果将是失败。

然而,有可能实现您希望暴露错误,而不是隐藏在一元处理之后。这可以通过materialize方法实现。

例如,你可以这样实现你的方法:
private def parseAndValidateCombined[X, Y](x: X, y: Y): Task[Combined] = {
val xTask = parseAndValidate(x).materialize // turn on Task[Try[X]]
val yTask = parseAndValidate(y).materialize // turn on Task[Try[Y]]

Task.parMap2(xTask, yTask) {
case (Success(xEval), Success(yEval)) => Success(SecretData(xEval, yEval))
case (Failure(_), Failure(_)) => Failure[Combined](new CustomException(....))
case (Failure(left), _) => Failure[Combined](left)
case (_, Failure(right)) => Failure[Combined](right)
}.dematerialize // turn to Task[Combined]
}

相关内容

  • 没有找到相关文章

最新更新