tldr:当我将http请求作为一部分时,在每个请求中实现流(即使用短词流)或在跨请求中使用单个流实现的一部分更好流?
详细信息:我有一个典型的服务,该服务需要HTTP请求,将其分散到下游服务的几个第三方服务(不是由我控制),并在将结果汇总回来之前将其汇总。我正在使用akka-http进行客户端实现和喷雾服务(旧版,随着时间的推移将移至akka-http)。示意性:
request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response
这可以通过实现每个请求的流或实现流的材料来实现(一次)并在请求中共享。
根据请求实现实体化造成材料的开销 1 ,尚不清楚如何利用连接池。此处描述了问题(许多物质化可以耗尽池)。我可以像这里一样在长期运行的HTTP流中包装池,并将其包装在mapAsync
"上游"中,但是我的错误处理策略对我来说尚不清楚。当单个请求失败并且流终止时,它也会降低池吗?更重要的是,由于未按顺序返回,我似乎需要调和请求和响应。
// example of stream per request
val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
Flow[HttpRequest]
.map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream.
.via(connectionFlow)
.map { case (response, _) => response }
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.via(httpFlow)
.mapAsync(1) {
// response handling logic
}
.runWith(Sink.last)
})
// example of stream per request with long running http stream
// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.mapAsync(1)(queueRequest)
.mapAsync(1) {
// somehow reconcile request with response?
// response handling logic
}
.runWith(Sink.last)
})
跨请求共享流也有类似的错误处理问题 - 似乎有一些故障模式可以在飞行中使用所有请求来降低该流。该代码将类似于主机级API,但在整个流中的队列中。
在这种情况下,哪种方式更好?
我确实尝试实现这两种解决方案,但是在实施的每个阶段都有许多设计选择,因此即使在"正确"的路径上也很容易搞砸。
1 尽管我认为它可以忽略不计,并且它是Akka-http服务器运行的方式。
通常,最好使用单个连接 Flow
并通过该单一流量调度所有请求。主要原因是由于以下事实,即新的实现实际上可能导致每次形成新的Connection
(取决于您的连接池设置)。
您是正确的,这会导致一些并发症:
排序:通过将随机UUID
作为元组中的第二个值 您正在传递连接流,您正在消除将请求与响应相关联的能力。元组中的额外T
值可以用作"相关ID",以了解您从流中获得的HttpResponse
。在您的特定示例中,您可以从创建的Range
中使用初始Int
:
val responseSource : Source[(Try[HttpResponse], Int), _] =
Source
.fromIterator( () => Iterator range (0,5) )
.map(i => HttpRequest(...) -> i)
.via(connectionFlow)
现在每个响应都带有原始int值,您可以用来处理响应。
错误处理:说明"单个请求失败,流终止"。单个请求故障并不一定会导致流失败。相反,您只需从连接流中获得(Failure(exception), Int)
值即可。现在,您知道哪个INT导致故障,并且您的流程有例外。