异步终止Akka Http Web套接字连接



Akka Http中的Web套接字连接被视为Akka Streams Flow。这似乎对基本的请求回复非常有效,但当消息也应该通过websocket推送时,它会变得更加复杂。我的服务器的核心看起来有点像:

lazy val authSuccessMessage = Source.fromFuture(someApiCall)
lazy val messageFlow = requestResponseFlow
    .merge(updateBroadcastEventSource)
lazy val handler = codec
  .atop(authGate(authSuccessMessage))
  .join(messageFlow)
handleWebSocketMessages {
  handler
}

这里,codec是(反)序列化BidiFlowauthGate是处理授权消息并防止任何消息流出直到授权成功的BidiFlow。成功后,它发送authSuccessMessage作为回复。requestResponseFlow是标准的请求-回复模式,updateBroadcastEventSource混合在异步推送消息中。

我希望能够在某些情况下发送错误消息并优雅地终止连接,例如授权不正确、someApiCall失败或requestResponseFlow处理的请求不正确。因此,基本上,基本上我似乎希望能够用最后一条消息异步完成messageFlow,即使它的其他组成流仍然存在。

了解了如何使用KillSwitch实现这一点。

更新版本

旧版本的问题是,当被堆栈中更高的BidiFlow阶段(例如我的authGate)触发时,它似乎不起作用。我不确定确切的原因,但将关闭建模为BidiFlow本身,放在堆栈的更高位置,解决了这个问题。

val shutoffPromise = Promise[Option[OutgoingWebsocketEvent]]()
/**
 * Shutoff valve for the connection. It is triggered when `shutoffPromise`
 * completes, and sends a final optional termination message if that
 * promise resolves with one.
 */
val shutoffBidi = {
  val terminationMessageSource = Source
    .maybe[OutgoingWebsocketEvent]
    .mapMaterializedValue(_.completeWith(shutoffPromise.future))
  val terminationMessageBidi = BidiFlow.fromFlows(
    Flow[IncomingWebsocketEventOrAuthorize],
    Flow[OutgoingWebsocketEvent].merge(terminationMessageSource)
  )
  val terminator = BidiFlow
    .fromGraph(KillSwitches.singleBidi[IncomingWebsocketEventOrAuthorize, OutgoingWebsocketEvent])
    .mapMaterializedValue { killSwitch =>
      shutoffPromise.future.foreach { _ => println("Shutting down connection"); killSwitch.shutdown() }
    }
  terminationMessageBidi.atop(terminator)
}

然后我将其应用于codec:内部

val handler = codec
  .atop(shutoffBidi)
  .atop(authGate(authSuccessMessage))
  .join(messageFlow)

旧版本

val shutoffPromise = Promise[Option[OutgoingWebsocketEvent]]()
/**
 * Shutoff valve for the flow of outgoing messages. It is triggered when
 * `shutoffPromise` completes, and sends a final optional termination
 * message if that promise resolves with one.
 */
val shutoffFlow = {
  val terminationMessageSource = Source
    .maybe[OutgoingWebsocketEvent]
    .mapMaterializedValue(_.completeWith(shutoffPromise.future))
  Flow
    .fromGraph(KillSwitches.single[OutgoingWebsocketEvent])
    .mapMaterializedValue { killSwitch =>
      shutoffPromise.future.foreach(_ => killSwitch.shutdown())
    }
    .merge(terminationMessageSource)
}

然后handler看起来像:

val handler = codec
  .atop(authGate(authSuccessMessage))
  .join(messageFlow via shutoffFlow)

最新更新