Akka Http中的Web套接字连接被视为Akka Streams Flow
。这似乎对基本的请求回复非常有效,但当消息也应该通过websocket推送时,它会变得更加复杂。我的服务器的核心看起来有点像:
lazy val authSuccessMessage = Source.fromFuture(someApiCall)
lazy val messageFlow = requestResponseFlow
.merge(updateBroadcastEventSource)
lazy val handler = codec
.atop(authGate(authSuccessMessage))
.join(messageFlow)
handleWebSocketMessages {
handler
}
这里,codec
是(反)序列化BidiFlow
,authGate
是处理授权消息并防止任何消息流出直到授权成功的BidiFlow
。成功后,它发送authSuccessMessage
作为回复。requestResponseFlow
是标准的请求-回复模式,updateBroadcastEventSource
混合在异步推送消息中。
我希望能够在某些情况下发送错误消息并优雅地终止连接,例如授权不正确、someApiCall
失败或requestResponseFlow
处理的请求不正确。因此,基本上,基本上我似乎希望能够用最后一条消息异步完成messageFlow
,即使它的其他组成流仍然存在。
了解了如何使用KillSwitch
实现这一点。
更新版本
旧版本的问题是,当被堆栈中更高的BidiFlow
阶段(例如我的authGate
)触发时,它似乎不起作用。我不确定确切的原因,但将关闭建模为BidiFlow
本身,放在堆栈的更高位置,解决了这个问题。
val shutoffPromise = Promise[Option[OutgoingWebsocketEvent]]()
/**
* Shutoff valve for the connection. It is triggered when `shutoffPromise`
* completes, and sends a final optional termination message if that
* promise resolves with one.
*/
val shutoffBidi = {
val terminationMessageSource = Source
.maybe[OutgoingWebsocketEvent]
.mapMaterializedValue(_.completeWith(shutoffPromise.future))
val terminationMessageBidi = BidiFlow.fromFlows(
Flow[IncomingWebsocketEventOrAuthorize],
Flow[OutgoingWebsocketEvent].merge(terminationMessageSource)
)
val terminator = BidiFlow
.fromGraph(KillSwitches.singleBidi[IncomingWebsocketEventOrAuthorize, OutgoingWebsocketEvent])
.mapMaterializedValue { killSwitch =>
shutoffPromise.future.foreach { _ => println("Shutting down connection"); killSwitch.shutdown() }
}
terminationMessageBidi.atop(terminator)
}
然后我将其应用于codec
:内部
val handler = codec
.atop(shutoffBidi)
.atop(authGate(authSuccessMessage))
.join(messageFlow)
旧版本
val shutoffPromise = Promise[Option[OutgoingWebsocketEvent]]()
/**
* Shutoff valve for the flow of outgoing messages. It is triggered when
* `shutoffPromise` completes, and sends a final optional termination
* message if that promise resolves with one.
*/
val shutoffFlow = {
val terminationMessageSource = Source
.maybe[OutgoingWebsocketEvent]
.mapMaterializedValue(_.completeWith(shutoffPromise.future))
Flow
.fromGraph(KillSwitches.single[OutgoingWebsocketEvent])
.mapMaterializedValue { killSwitch =>
shutoffPromise.future.foreach(_ => killSwitch.shutdown())
}
.merge(terminationMessageSource)
}
然后handler
看起来像:
val handler = codec
.atop(authGate(authSuccessMessage))
.join(messageFlow via shutoffFlow)