如何避免在线程模式下使用FastAPI时耗尽数据库连接池(使用"def"而不是"async



我将FastAPI用于生产应用程序,该应用程序几乎完全使用异步,除非在访问数据库时使用。数据库仍然依赖于同步SQLAlchemy,因为异步版本当时仍处于alpha(或早期beta(版本。

虽然我们的服务在到达数据库时确实会进行同步阻塞调用,但它仍然封装在异步函数中。我们确实运行了多个工作者和应用程序的几个实例,以确保我们不会遇到严重的瓶颈。

与线程并发

我知道FastAPI在使用def controller_method方法时使用线程提供并发性,但我似乎找不到它如何控制环境的任何细节。有人能帮我理解如何控制一个进程可以生成的最大线程数吗。如果它达到系统限制怎么办?

数据库连接

当我使用异步等待模型时,我在中间件中创建数据库连接对象,该对象被注入控制器操作中。

@app.middleware("http")
async def db_session_middleware(request: Request, call_next):

await _set_request_id()
try:
request.state.db = get_sessionmaker(scope_func=None)
response = await call_next(request)
finally:
if request.state.db.is_active:
request.state.db.close()
return response

当通过线程完成时,控制器是否已经在单独的线程中被调用,从而确保每个请求都有单独的连接?

现在,如果我不能限制主进程生成的线程数量,如果我的应用程序突然收到大量请求,它不会超过数据库连接池的限制并最终阻止我的应用吗?

我可以配置FastAPI使用的中央线程池吗?还是由Uvicorn控制?

乌维科恩

我看到Uvicorn有一个配置,让它使用--limit-concurrency 60标志来限制并发性。这是否控制了在线程模式下创建的并发线程的数量?

如果是这样的话,这是否总是低于我的连接池(连接池+最大溢出=40(

因此,在我允许60的uvicon并发限制的场景中,我的数据库连接池配置应该是这样的吗?

engine = sqlalchemy.create_engine(
cfg("DB_URL"), 
pool_size=40, 
max_overflow=20, 
echo=False, 
pool_use_lifo=False,
pool_recycle=120
)

在这种情况下,是否使用了一个中央线程池?是否有任何示例项目可以让我查看,以了解在大规模部署时如何进行配置。

我使用了Netflix Dispatch作为参考,但如果还有其他项目,我肯定想看看。

Fastapi使用Starlette作为底层框架。Starlette提供了一种在使用anyio的线程池中启动def路径操作的机制。因此,我们可以通过设置anyio的CapacityLimiter的属性total_tokens来限制可以同时执行的线程数。

以下示例:

import threading
import anyio
import uvicorn
from fastapi import FastAPI
import time
import logging
THREADS_LIMIT = 5
logging.basicConfig(level=logging.DEBUG)
app = FastAPI()

class Counter(object):
def __init__(self):
self._value = 0
self._lock = threading.Lock()
def increment(self):
with self._lock:
self._value += 1
def decrement(self):
with self._lock:
self._value -= 1
def value(self):
with self._lock:
return self._value

counter = Counter()

@app.get("/start_task")
def start_task():
counter.increment()
logging.info("Route started. Counter: %d", counter.value())
time.sleep(10)
counter.decrement()
logging.info("Route stopped. Counter: %d", counter.value())
return "Task done"

@app.on_event("startup")
async def startup_event():
limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = THREADS_LIMIT

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug")

尝试并行打开50个连接:

seq 1 50 | xargs -n1 -P50  curl "http://localhost:8000/start_task" 

我们看到,同时处理的请求数量被限制为5个。

输出:

INFO:root:Route started. Counter: 1
INFO:root:Route started. Counter: 2
INFO:root:Route started. Counter: 3
INFO:root:Route started. Counter: 4
INFO:root:Route started. Counter: 5
INFO:root:Route stopped. Counter: 4
INFO:uvicorn.access:127.0.0.1:60830 - "GET /start_task HTTP/1.1" 200
INFO:root:Route stopped. Counter: 3
INFO:root:Route started. Counter: 4
INFO:uvicorn.access:127.0.0.1:60832 - "GET /start_task HTTP/1.1" 200
INFO:root:Route started. Counter: 5
...

最新更新