>我有一个依赖于 API 响应的流程。当响应不符合我的预期时,将引发异常。此策略适用于喷雾和具有规格2的直接方法测试。
但是,当我尝试使用带有异常抛出模块的流时,流会停止。
这是我的流程:
Source(() => file)
.via(csvToSeq)
.via(getFromElastic)
.via(futureExtrtactor)
.via(findLocaionOfId)
.foreach(v => v.map(v => println("foreached", v)))
.onComplete(_ => system.shutdown())
我的策略是将map
用于期货。
这样:
val findLocaionOfId = Flow[Future[Seq[(String, JsValue)]]].map(future => future.map(jsSeq => {
jsSeq.zipWithIndex.flatMap { case (x, i) => x._2.asJsObject.getFields("_source").flatMap(js => {
js.asJsObject("Couldn't convert").getFields("externalId").map({
case JsString(str) => {
(i + 1, i == 0, js)
}
else (i, false, js)
}
case _ => (i, false, x)
})
})
}
}))
这是一个位于完全不同的位置的潜在异常抛出器:
val encoded_url = URLEncoder.encode(url, "UTF-8")
似乎我错过了一些东西,但看不到什么。感谢您的任何指示。
这听起来像是一个在实施 Akka 流监督后将得到解决的问题。Akka Streams仍处于"预实验阶段",因此该功能尚未实现,但肯定计划很快包含在内。
截至撰写此评论时,当前版本是 1.0-M2(预览里程碑)。
我缺少的是mapAsync
.
将上述函数更改为:
val findLocaionOfId = Flow[Future[Seq[(String, JsValue)]]].mapAsync({...})
通过这种方式,期货被解开,异常按预期停止程序。