akka-stream akka-http生命周期



tldr:当我将http请求作为一部分时,在每个请求中实现流(即使用短词流)或在跨请求中使用单个流实现的一部分更好流?

详细信息:我有一个典型的服务,该服务需要HTTP请求,将其分散到下游服务的几个第三方服务(不是由我控制),并在将结果汇总回来之前将其汇总。我正在使用akka-http进行客户端实现和喷雾服务(旧版,随着时间的推移将移至akka-http)。示意性:

request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response

这可以通过实现每个请求的流或实现流的材料来实现(一次)并在请求中共享。

根据请求实现实体化造成材料的开销 1 ,尚不清楚如何利用连接池。此处描述了问题(许多物质化可以耗尽池)。我可以像这里一样在长期运行的HTTP流中包装池,并将其包装在mapAsync"上游"中,但是我的错误处理策略对我来说尚不清楚。当单个请求失败并且流终止时,它也会降低池吗?更重要的是,由于未按顺序返回,我似乎需要调和请求和响应。

// example of stream per request
val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
    Flow[HttpRequest]
      .map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream.
      .via(connectionFlow)
      .map { case (response, _) => response }
val result = Range(1 to 5).foreach{ i => {
  Source.single(i)
    .map(HttpRequest(...))
    .via(httpFlow)
    .mapAsync(1) {
       // response handling logic
    }
    .runWith(Sink.last)
})

// example of stream per request with long running http stream
// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]
val result = Range(1 to 5).foreach{ i => {
  Source.single(i)
    .map(HttpRequest(...))
    .mapAsync(1)(queueRequest)
    .mapAsync(1) {
       // somehow reconcile request with response?
       // response handling logic
    }
    .runWith(Sink.last)
})

跨请求共享流也有类似的错误处理问题 - 似乎有一些故障模式可以在飞行中使用所有请求来降低该流。该代码将类似于主机级API,但在整个流中的队列中。

在这种情况下,哪种方式更好?

我确实尝试实现这两种解决方案,但是在实施的每个阶段都有许多设计选择,因此即使在"正确"的路径上也很容易搞砸。

1 尽管我认为它可以忽略不计,并且它是Akka-http服务器运行的方式。

通常,最好使用单个连接 Flow并通过该单一流量调度所有请求。主要原因是由于以下事实,即新的实现实际上可能导致每次形成新的Connection(取决于您的连接池设置)。

您是正确的,这会导致一些并发症:

排序:通过将随机UUID作为元组中的第二个值 您正在传递连接流,您正在消除将请求与响应相关联的能力。元组中的额外T值可以用作"相关ID",以了解您从流中获得的HttpResponse。在您的特定示例中,您可以从创建的Range中使用初始Int

val responseSource : Source[(Try[HttpResponse], Int), _] = 
  Source
    .fromIterator( () => Iterator range (0,5) )
    .map(i => HttpRequest(...) -> i)
    .via(connectionFlow)

现在每个响应都带有原始int值,您可以用来处理响应。

错误处理:说明"单个请求失败,流终止"。单个请求故障并不一定会导致流失败。相反,您只需从连接流中获得(Failure(exception), Int)值即可。现在,您知道哪个INT导致故障,并且您的流程有例外。

最新更新