下面的代码将流式传输回客户端,我收集的是一种比使用 Java 的 IO 流更惯用的方式。但是,它有一个问题:流完成后连接保持打开状态。
def getImage() = Action { request =>
val imageUrl = "http://hereandthere.com/someimageurl.png"
Ok.stream({ content: Iteratee[Array[Byte], Unit] =>
WS.url(imageUrl).withHeaders("Accept"->"image/png").get { response => content }
return
}).withHeaders("Content-Type"->"image/png")
}
这适用于将大型 (>1 MB) 文件从内部 API 流式传输到请求者。
问题是,为什么它保持连接打开?它期望上游服务器提供什么吗?我使用 curl 测试了上游服务器,并且连接确实关闭了 - 通过此代理时它只是没有关闭。
流未完成的原因是 EOF 未发送到从 WS.get() 调用返回的迭代器。如果没有这个显式的EOF,连接将保持打开状态 - 因为它处于分块模式,并且可能是一个长时间运行的彗星状连接。
这是固定代码:
Ok.stream({ content: Iteratee[Array[Byte], Unit] =>
WS.url(imageUrl)
.withHeaders("Accept"->"image/png")
.get { response => content }
.onRedeem { ii =>
ii.feed(Input.EOF)
}
}).withHeaders("Content-Type"->"image/png")
这是播放 2.1.0 的修改版本。请参阅 https://groups.google.com/forum/#!msg/play-framework/HwoRR-nipCc/gUKs9NexCx4J
感谢Anatoly G的分享。
def proxy = Action {
val url = "..."
Async {
val iterateePromise = Promise[Iteratee[Array[Byte], Unit]]
val resultPromise = Promise[ChunkedResult[Array[Byte]]]
WS.url(url).get { responseHeaders =>
resultPromise.success {
new Status(responseHeaders.status).stream({ content: Iteratee[Array[Byte], Unit] =>
iterateePromise.success(content)
}).withHeaders(
"Content-Type" -> responseHeaders.headers.getOrElse("Content-Type", Seq("application/octet-stream")).head,
"Connection" -> "Close")
}
Iteratee.flatten(iterateePromise.future)
}.onComplete {
case Success(ii) => ii.feed(Input.EOF)
case Failure(t) => resultPromise.failure(t)
}
resultPromise.future
}
}
play 2.2.x 的更新:
def proxy = Action.async {
val url = "http://localhost:9000"
def enumerator(chunks: Iteratee[Array[Byte], Unit] => _) = {
new Enumerator[Array[Byte]] {
def apply[C](i: Iteratee[Array[Byte], C]): Future[Iteratee[Array[Byte], C]] = {
val doneIteratee = Promise[Iteratee[Array[Byte], C]]()
chunks(i.map {
done =>
doneIteratee.success(Done[Array[Byte], C](done)).asInstanceOf[Unit]
})
doneIteratee.future
}
}
}
val iterateePromise = Promise[Iteratee[Array[Byte], Unit]]()
val resultPromise = Promise[SimpleResult]()
WS.url(url).get {
responseHeaders =>
resultPromise.success(new Status(responseHeaders.status).chunked(
enumerator({
content: Iteratee[Array[Byte], Unit] => iterateePromise.success(content)
}
)).withHeaders(
"Content-Type" -> responseHeaders.headers.getOrElse("Content-Type", Seq("application/octet-stream")).head,
"Connection" -> "Close"))
Iteratee.flatten(iterateePromise.future)
}.onComplete {
case Success(ii) => ii.feed(Input.EOF)
case Failure(t) => throw t
}
resultPromise.future
}
如果有人有更好的解决方案,我对此非常感兴趣!