Akka HTTP 源流与常规请求处理



与处理请求的常规方式相比,使用源流的优势是什么?我的理解是,在这两种情况下

  1. TCP 连接将被重复使用
  2. 压将在客户端和服务器之间施加

我能看到的源流的唯一优点是,如果响应非常大,并且客户端更喜欢以较小的块使用它。

我的用例是我有一个很长的用户列表(数百万),我需要调用一个对用户执行一些筛选并返回子集的服务。

目前,在服务器端,我公开了一个批处理 API,而在客户端,我只是将用户分成 1000 个块,并使用 Akka HTTP Host API 并行进行 X 个批处理调用。

我正在考虑切换到HTTP流,但无法完全弄清楚价值是多少

你错过了另一个巨大的好处:内存效率。 通过拥有流管道、客户端/服务器/客户端,所有各方都可以安全地处理数据,而不会冒破坏内存分配的风险。 这在服务器端特别有用,您始终必须假设客户端可能会做一些恶意的事情......

客户端请求创建

假设数百万用户的最终来源是一个文件。 您可以从此文件创建流源:

val userFilePath : java.nio.file.Path = ???
val userFileSource = akka.stream.scaladsl.FileIO(userFilePath)

您可以使用此源来创建 http 请求,该请求将用户流式传输到服务:

import akka.http.scaladsl.model.HttpEntity.{Chunked, ChunkStreamPart}
import akka.http.scaladsl.model.{RequestEntity, ContentTypes, HttpRequest}
val httpRequest : HttpRequest = 
HttpRequest(uri = "http://filterService.io", 
entity = Chunked.fromData(ContentTypes.`text/plain(UTF-8)`, userFileSource))

此请求现在会将用户流式传输到服务,而不会将整个文件消耗到内存中。一次只会缓冲数据块,因此,您可以发送具有潜在无限数量的用户的请求,您的客户端就可以了。

服务器请求处理

同样,可以将服务器设计为接受具有可能无限长度的实体的请求。

您的问题说该服务将过滤用户,假设我们有过滤功能:

val isValidUser : (String) => Boolean = ???

这可用于过滤传入的请求实体并创建一个响应实体,该实体将馈送响应:

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.HttpEntity.Chunked
val route = extractDataBytes { userSource =>
val responseSource : Source[ByteString, _] = 
userSource
.map(_.utf8String)
.filter(isValidUser)
.map(ByteString.apply)
complete(HttpResponse(entity=Chunked.fromData(ContentTypes.`text/plain(UTF-8)`, 
responseSource)))
}

客户端响应处理

客户端可以类似地处理筛选的用户,而无需将它们全部读入内存。 例如,我们可以调度请求并将所有有效用户发送到控制台:

import akka.http.scaladsl.Http
Http()
.singleRequest(httpRequest)
.map { response =>
response
.entity
.dataBytes
.map(_.utf8String)
.foreach(System.out.println)
}

最新更新