我有一个HTTP连接池,该连接池在运行几个小时后悬挂:
private def createHttpPool(host: String): SourceQueue[(HttpRequest, Promise[HttpResponse])] = {
val pool = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host)
Source.queue[(HttpRequest, Promise[HttpResponse])](config.poolBuffer, OverflowStrategy.dropNew)
.via(pool).toMat(Sink.foreach {
case ((Success(res), p)) => p.success(res)
case ((Failure(e), p)) => p.failure(e)
})(Keep.left).run
}
i与:
一起使用。private def enqueue(uri: Uri): Future[HttpResponse] = {
val promise = Promise[HttpResponse]
val request = HttpRequest(uri = uri) -> promise
queue.offer(request).flatMap {
case Enqueued => promise.future
case _ => Future.failed(ConnectionPoolDroppedRequest)
}
}
并解决这样的响应:
private def request(uri: Uri): Future[HttpResponse] = {
def retry = {
Thread.sleep(config.dispatcherRetryInterval)
logger.info(s"retrying")
request(uri)
}
logger.info("req-start")
for {
response <- enqueue(uri)
_ = logger.info("req-end")
finalResponse <- response.status match {
case TooManyRequests => retry
case OK => Future.successful(response)
case _ => response.entity.toStrict(10.seconds).map(s => throw Error(s.toString, uri.toString))
}
} yield finalResponse
}
如果未来成功,则该功能的结果始终会转换:
def get(uri: Uri): Future[Try[JValue]] = {
for {
response <- request(uri)
json <- Unmarshal(response.entity).to[Try[JValue]]
} yield json
}
一切正常工作一段时间,然后我在日志中看到的一切都是req-start,而没有req-end。
我的Akka配置就是这样:
akka {
actor.deployment.default {
dispatcher = "my-dispatcher"
}
}
my-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 256
parallelism-factor = 128.0
parallelism-max = 1024
}
}
akka.http {
host-connection-pool {
max-connections = 512
max-retries = 5
max-open-requests = 16384
pipelining-limit = 1
}
}
我不确定这是配置问题还是代码问题。我的并行性和连接数量如此之高,因为没有它,我的req/s速率很差(我想尽可能快地请求 - 我还有其他限制代码来保护服务器)。
您没有消耗从服务器中获得的响应的实体。引用以下文档:
消费(或丢弃)请求的实体是必须的!如果 偶然剩下的或丢弃的akka http会假设 传入的数据应保持后压力,并将失速 通过TCP反压机制传入数据。客户应该 消耗实体,无论httpresponse的状态如何。
实体以Source[ByteString, _]
的形式出现,需要运行以避免资源饥饿。
如果您不需要阅读实体,那么消耗实体字节的最简单方法是使用
丢弃它们res.discardEntityBytes()
(您可以通过添加 - 例如-.future().map(...)
)。
文档中的此页面描述了所有替代方案,包括在需要时如何读取字节。
---编辑
提供了更多代码/信息后,很明显,资源消耗不是问题。此实现中还有另一个大危险信号,即重试方法中的Thread.sleep
。这是一个阻止呼叫,很可能会饿死您的基础演员系统的线程基础架构。
关于为什么在文档中提供危险的原因。
尝试更改它并使用akka.pattern.after
(DOCS)。下面的示例:
def retry = akka.pattern.after(200 millis, using = system.scheduler)(request(uri))