为什么 Play 框架不关闭 Akka 流?



演员初始化了连接到Websocket的Akka流。这是通过使用可以发送消息的Source.actorRef来完成的,然后由webSocketClientFlow处理并由Sink.foreach处理。可以在以下代码(源自Akka Docs)中看到:

class TestActor @Inject()(implicit ec: ExecutionContext) extends Actor with ActorLogging {
  final implicit val system: ActorSystem = ActorSystem()
  final implicit val materializer: ActorMaterializer = ActorMaterializer()
  def receive = {
    case _ =>
  }
  // Consume the incoming messages from the websocket.
  val incoming: Sink[Message, Future[Done]] =
  Sink.foreach[Message] {
    case message: TextMessage.Strict =>
      println(message.text)
    case misc => println(misc)
  }
  // Source through which we can send messages to the websocket.
  val outgoing: Source[TextMessage, ActorRef] =
  Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)
  // flow to use (note: not re-usable!)
  val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws-feed.gdax.com"))
  // Materialized the stream
  val ((ws,upgradeResponse), closed) =
  outgoing
    .viaMat(webSocketFlow)(Keep.both)
    .toMat(incoming)(Keep.both) // also keep the Future[Done]
    .run()
  // Check whether the server has accepted the websocket request.
  val connected = upgradeResponse.flatMap { upgrade =>
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
      Future.successful(Done)
    } else {
      throw new RuntimeException(s"Failed: ${upgrade.response.status}")
    }
  }
  // When the connection has been established.
  connected.onComplete(println)
  // When the stream has closed
  closed.onComplete {
    case Success(_) => println("Test Websocket closed gracefully")
    case Failure(e) => log.error("Test Websocket closed with an errorn", e)
  }
}

播放框架重新编译时,它将关闭 stestactor ,但不会关闭Akka流。只有当Websocket超时关闭时。

这是否意味着我需要手动关闭流,例如,在 testactor PostStop函数中使用 Source.actorRef a PoisonPill创建的演员?

NOTE :我还尝试注入MaterializerActorsystem,即:

@Inject()(implicit ec: ExecutionContext, implicit val mat: Materializer, implicit val system: ActorSystem)

当播放重新编译时,流关闭,但也会产生错误:

[error] a.a.ActorSystemImpl - Websocket handler failed with
Processor actor [Actor[akka://application/user/StreamSupervisor-62/flow-0-0-ignoreSink#989719582]] 
terminated abruptly

在第一个示例中,您正在演员中创建一个演员系统。您不应该这样做 - 演员系统很昂贵,创建一个意味着启动线程池,启动调度程序等。此外,您永远不会关闭它,这意味着您的问题比流不关闭的问题要大得多 - 您有资源泄漏,由演员系统创建的线程池永远不会关闭。

本质上,每当您收到Websocket连接时,您都会创建一个带有新的线程池的新演员系统,而您永远不会关闭它们。在生产中,即使有很小的负载(每秒几个请求),您的应用程序将在几分钟之内用尽内存。

一般来说,您绝不应该创建自己的演员系统,而应该注入一个。从演员中,您甚至不需要注入它,因为它自动为-context.system使您可以访问创建演员的演员系统。同样,对于物质化器,它们的重量并不那么重,但是如果您每连接创建一个,则如果不关闭它,也可能会用尽内存,因此应该注入它。

因此,当您注入它时,您会遇到错误 - 这很难避免,尽管并非不可能。困难是,Akka本身不能真正自动知道需要关闭哪些订单才能优雅地关闭事物,如果它首先关闭您的演员,以便它可以优雅地关闭流,或者应该关闭流下降,以便他们可以通知您的演员关闭并做出相应的回应?

Akka 2.5对此有一个解决方案,即托管关闭序列,您可以在演员系统开始以某种随机的顺序杀死事物之前注册要关闭的东西:

https://doc.akka.io/docs/akka/2.5/scala/actors.html#coordined-shutdown

您可以将其与Akka Streams Kill Switch结合使用,以便在关闭其余的应用程序之前优雅地关闭您的流。

,但通常,关闭错误是相当良性的,所以如果是我,我不必担心它们。

最新更新