Play2 框架代理将内容流式传输到客户端在流式处理完成后保持连接打开



下面的代码将流式传输回客户端,我收集的是一种比使用 Java 的 IO 流更惯用的方式。但是,它有一个问题:流完成后连接保持打开状态。

def getImage() = Action { request =>
  val imageUrl = "http://hereandthere.com/someimageurl.png"
  Ok.stream({ content: Iteratee[Array[Byte], Unit] => 
    WS.url(imageUrl).withHeaders("Accept"->"image/png").get { response => content }
    return
  }).withHeaders("Content-Type"->"image/png")
}

这适用于将大型 (>1 MB) 文件从内部 API 流式传输到请求者。

问题是,为什么它保持连接打开?它期望上游服务器提供什么吗?我使用 curl 测试了上游服务器,并且连接确实关闭了 - 通过此代理时它只是没有关闭。

流未完成的原因是 EOF 未发送到从 WS.get() 调用返回的迭代器。如果没有这个显式的EOF,连接将保持打开状态 - 因为它处于分块模式,并且可能是一个长时间运行的彗星状连接。

这是固定代码:

Ok.stream({ content: Iteratee[Array[Byte], Unit] => 
  WS.url(imageUrl)
    .withHeaders("Accept"->"image/png")
    .get { response => content }
    .onRedeem { ii =>
       ii.feed(Input.EOF)
    }
}).withHeaders("Content-Type"->"image/png")

这是播放 2.1.0 的修改版本。请参阅 https://groups.google.com/forum/#!msg/play-framework/HwoRR-nipCc/gUKs9NexCx4J

感谢Anatoly G的分享。

def proxy = Action {
   val url = "..."
   Async {
     val iterateePromise = Promise[Iteratee[Array[Byte], Unit]]
     val resultPromise = Promise[ChunkedResult[Array[Byte]]]
     WS.url(url).get { responseHeaders =>
       resultPromise.success {
         new Status(responseHeaders.status).stream({ content: Iteratee[Array[Byte], Unit] =>
           iterateePromise.success(content)
         }).withHeaders(
           "Content-Type" -> responseHeaders.headers.getOrElse("Content-Type", Seq("application/octet-stream")).head,
           "Connection" -> "Close")
       }
       Iteratee.flatten(iterateePromise.future)
     }.onComplete {
       case Success(ii) => ii.feed(Input.EOF)
       case Failure(t) => resultPromise.failure(t)
     }
     resultPromise.future
   }
}

play 2.2.x 的更新:

def proxy = Action.async {
  val url = "http://localhost:9000"
  def enumerator(chunks: Iteratee[Array[Byte], Unit] => _) = {
    new Enumerator[Array[Byte]] {
      def apply[C](i: Iteratee[Array[Byte], C]): Future[Iteratee[Array[Byte], C]] = {
        val doneIteratee = Promise[Iteratee[Array[Byte], C]]()
        chunks(i.map {
          done =>
            doneIteratee.success(Done[Array[Byte], C](done)).asInstanceOf[Unit]
        })
        doneIteratee.future
      }
    }
  }
  val iterateePromise = Promise[Iteratee[Array[Byte], Unit]]()
  val resultPromise = Promise[SimpleResult]()
  WS.url(url).get {
    responseHeaders =>
      resultPromise.success(new Status(responseHeaders.status).chunked(
        enumerator({
          content: Iteratee[Array[Byte], Unit] => iterateePromise.success(content)
        }
        )).withHeaders(
        "Content-Type" -> responseHeaders.headers.getOrElse("Content-Type", Seq("application/octet-stream")).head,
        "Connection" -> "Close"))
      Iteratee.flatten(iterateePromise.future)
  }.onComplete {
    case Success(ii) => ii.feed(Input.EOF)
    case Failure(t) => throw t
  }
  resultPromise.future
}

如果有人有更好的解决方案,我对此非常感兴趣!

最新更新