自定义处理 Akka 流流中的未来故障



我尝试在 Akka 流中连接多个流,并根据流以不同的方式处理它们的错误。可以使用 sth 完成,如下所示:

Flow[String, Either[ProcessingError, String], NotUsed]

然后根据任一值将响应转移到错误处理程序。

我的问题是,某些流返回 Future[字符串] 而不是字符串,我不知道如何评估它以便能够在每个流之后捕获错误并以自定义方式处理它。

要将Future转换为Either而不会使流失败,您可以使用

.mapAsync(1){ e => 
val f: Future[T] = ...
f.transformWith(_.toEither)
}

mapAsyncmapAsyncUnordered是评估阿卡溪流期货的惯用方法。请注意,那些失败的未来将使流失败,要处理流中的错误,您需要"立即"对未来做出反应,以将其转换为TryEither

最新更新