具有周期的 Akka HTTP 流式处理 API 永远不会完成



我正在构建一个应用程序,我在其中接受来自用户的请求,调用 REST API 以获取一些数据,然后根据该响应进行另一个 HTTP 调用,依此类推。 基本上,我正在处理一个数据树,其中树中的每个节点都需要我递归调用此 API,如下所示:

A
/ 
B   C
/    
D   E   F

我正在使用Akka HTTP和Akka Streams来构建应用程序,所以我使用它的流API,如下所示:

val httpFlow = Http().cachedConnection(host = "localhost")
val flow = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Data](2))
val bcast = b.add(Broadcast[ResponseData](2))
takeUserData ~> merge ~> createRequest ~> httpFlow ~> processResponse ~> bcast
merge <~         extractSubtree                       <~ bcast
FlowShape(takeUserData.in, bcast.out(1))
}

我知道在 Akka Streams 应用程序中处理递归的最佳方法是处理流之外的递归,但由于我递归调用 HTTP 流以获取每个数据子树,我想确保流在 API 过载的情况下正确背压。

问题是此流永远不会完成。 如果我将其连接到这样的简单来源:

val source = Source.single(data)
val sink = Sink.seq[ResponseData]
source.via(flow).runWith(sink)

它打印出它正在处理树中的所有数据,然后停止打印任何内容,只是永远闲置。

我阅读了有关周期的文档,建议是在那里放一个MergePreferred,但这似乎没有帮助。 这个问题有所帮助,但我不明白为什么MergePreferred不会阻止死锁,因为与他们的示例不同,元素从树的每个级别的流中删除。

为什么MergePreferred不能避免僵局,有没有另一种方法可以做到这一点?

MergePreferred(在没有eagerComplete为真的情况下)将在所有输入完成后完成,这通常适用于 Akka Streams 中的阶段(完成从一开始就向动)。

因此,这意味着在输入和extractSubtree信号完成之前,合并无法传播完成。extractSubtree不会发出完成信号(很可能,不知道该流中的阶段),直到bcast发出完成信号,这(再次最有可能)不会发生,直到processResponse信号完成*才会发生,直到httpFlow信号完成,直到createRequest信号完成才会发生*,直到merge信号完成才会发生*。 因为一般来说,检测这个循环是不可能的(考虑到有些阶段的完成是完全动态的),Akka Streams有效地采取了这样的立场,即如果你想创建一个这样的循环,就由你来决定如何打破这个循环。

正如你所注意到的,eagerCompletetrue 会改变这种行为,但由于它会在任何输入完成后立即完成(在这种情况下,由于循环,它将始终是输入),merge完成并取消对extractSubtree的需求(这本身可能(取决于Broadcast是否eagerCancel设置)导致下游取消), 这可能会导致至少一些由extractSubtree发出的元素没有得到处理。

如果你绝对确定输入完成意味着循环最终会枯竭,那么一旦循环干燥并且输入完成,如果你有一些方法可以完成extractSubtree,你可以使用eagerComplete = false。 一个大致的大纲(不知道具体是什么,extractSubtree)来解决这个问题:

  • 将进入extractSubtree的所有内容从bcast映射到输入的Some
  • 预具体化可以向其发送NoneSource.actorRef,保存ActorRef(这将是此源的具体化值)
  • 将输入与该预实例化源合并
  • 提取子树时,使用statefulMapConcat阶段来跟踪 a) 是否看到了None和 b) 有多少子树挂起(初始值 1,将此节点的(第一代)子节点数减去 1,即没有子节点减去 1);如果已经看到None并且没有子树挂起,则发出List(None),否则发出包裹在Some中的每个子树的List
  • 有一个takeWhile(_.isDefined),一旦看到None,它就会完成
  • 如果你在extractSubtrees中有更复杂的东西(例如副作用),你必须弄清楚把它们放在哪里
  • 在合并外部输入之前,将其传递到watchTermination阶段,并在将来回调(成功时)将None发送到您在为extractSubtrees预实现Source.actorRef时获得的ActorRef。 因此,当输入完成时,watchTermination将成功触发并有效地向extractSubtrees发送一条消息,以便在完成动态树时监视。

最新更新