我正在构建一个应用程序,我在其中接受来自用户的请求,调用 REST API 以获取一些数据,然后根据该响应进行另一个 HTTP 调用,依此类推。 基本上,我正在处理一个数据树,其中树中的每个节点都需要我递归调用此 API,如下所示:
A
/
B C
/
D E F
我正在使用Akka HTTP和Akka Streams来构建应用程序,所以我使用它的流API,如下所示:
val httpFlow = Http().cachedConnection(host = "localhost")
val flow = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Data](2))
val bcast = b.add(Broadcast[ResponseData](2))
takeUserData ~> merge ~> createRequest ~> httpFlow ~> processResponse ~> bcast
merge <~ extractSubtree <~ bcast
FlowShape(takeUserData.in, bcast.out(1))
}
我知道在 Akka Streams 应用程序中处理递归的最佳方法是处理流之外的递归,但由于我递归调用 HTTP 流以获取每个数据子树,我想确保流在 API 过载的情况下正确背压。
问题是此流永远不会完成。 如果我将其连接到这样的简单来源:
val source = Source.single(data)
val sink = Sink.seq[ResponseData]
source.via(flow).runWith(sink)
它打印出它正在处理树中的所有数据,然后停止打印任何内容,只是永远闲置。
我阅读了有关周期的文档,建议是在那里放一个MergePreferred
,但这似乎没有帮助。 这个问题有所帮助,但我不明白为什么MergePreferred
不会阻止死锁,因为与他们的示例不同,元素从树的每个级别的流中删除。
为什么MergePreferred
不能避免僵局,有没有另一种方法可以做到这一点?
MergePreferred
(在没有eagerComplete
为真的情况下)将在所有输入完成后完成,这通常适用于 Akka Streams 中的阶段(完成从一开始就向动)。
因此,这意味着在输入和extractSubtree
信号完成之前,合并无法传播完成。extractSubtree
不会发出完成信号(很可能,不知道该流中的阶段),直到bcast
发出完成信号,这(再次最有可能)不会发生,直到processResponse
信号完成*才会发生,直到httpFlow
信号完成,直到createRequest
信号完成才会发生*,直到merge
信号完成才会发生*。 因为一般来说,检测这个循环是不可能的(考虑到有些阶段的完成是完全动态的),Akka Streams有效地采取了这样的立场,即如果你想创建一个这样的循环,就由你来决定如何打破这个循环。
正如你所注意到的,eagerComplete
true 会改变这种行为,但由于它会在任何输入完成后立即完成(在这种情况下,由于循环,它将始终是输入),merge
完成并取消对extractSubtree
的需求(这本身可能(取决于Broadcast
是否eagerCancel
设置)导致下游取消), 这可能会导致至少一些由extractSubtree
发出的元素没有得到处理。
如果你绝对确定输入完成意味着循环最终会枯竭,那么一旦循环干燥并且输入完成,如果你有一些方法可以完成extractSubtree
,你可以使用eagerComplete = false
。 一个大致的大纲(不知道具体是什么,extractSubtree
)来解决这个问题:
- 将进入
extractSubtree
的所有内容从bcast
映射到输入的Some
- 预具体化可以向其发送
None
的Source.actorRef
,保存ActorRef
(这将是此源的具体化值) - 将输入与该预实例化源合并
- 提取子树时,使用
statefulMapConcat
阶段来跟踪 a) 是否看到了None
和 b) 有多少子树挂起(初始值 1,将此节点的(第一代)子节点数减去 1,即没有子节点减去 1);如果已经看到None
并且没有子树挂起,则发出List(None)
,否则发出包裹在Some
中的每个子树的List
- 有一个
takeWhile(_.isDefined)
,一旦看到None
,它就会完成 - 如果你在
extractSubtrees
中有更复杂的东西(例如副作用),你必须弄清楚把它们放在哪里 - 在合并外部输入之前,将其传递到
watchTermination
阶段,并在将来回调(成功时)将None
发送到您在为extractSubtrees
预实现Source.actorRef
时获得的ActorRef
。 因此,当输入完成时,watchTermination
将成功触发并有效地向extractSubtrees
发送一条消息,以便在完成动态树时监视。