调用中间件中的call_next(路径操作函数)时,FastAPI/Unvicon请求挂起



我们在EC2上运行的docker容器中有一个机器学习模型。

我们使用Cortex.dev来自动缩放GPU。

非决定性地,请求将在FastAPI中间件中的call_next函数期间挂起。不幸的是,它不可复制。

Middleware pre-request打印行被记录,但路径操作函数中的第一条打印语句从未被记录。

我们尝试过的东西:

  • 与1名工人一起运行Uvicorn
  • run函数在没有异步的情况下运行
  • bytes而不是UploadFile作为image的参数类型运行

这些更改都不能解决挂起问题,但这是性能最高的配置。

  1. 这是否意味着问题出在FastAPI而不是Uvicorn?

  2. 如果是,是什么原因导致FastAPI挂起?如果没有,问题在哪里?如何解决?

Dockerfile

FROM nvidia/cuda:11.4.0-runtime-ubuntu18.04
WORKDIR /usr/src/app
RUN apt-get -y update && 
apt-get install -y --fix-missing 
build-essential 
cmake 
python3 
python3-pip 
ffmpeg 
libsm6 
libxext6 
&& apt-get clean && rm -rf /tmp/* /var/tmp/*
ADD ./requirements.txt ./
# install our dependencies
RUN python3 -m pip install --upgrade pip && python3 -m pip install -r requirements.txt && apt-get clean && rm -rf /tmp/* /var/tmp/*
ADD ./ ./
ENV LC_ALL=C.UTF-8 
ENV LANG=C.UTF-8
EXPOSE 8080
CMD uvicorn api:app --host 0.0.0.0 --port 8080 --workers 2

api.py

from my_predictor import PythonPredictor
from typing import Optional
from datetime import datetime
import time
from starlette.responses import Response
from fastapi import FastAPI, File, UploadFile, Form, Response, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
cortex_id = request.headers.get('x-request-id')
start_time = time.time()
print("Cortex ID: " + cortex_id + ". > Middleware pre-request. Time stamp: " + str(start_time), flush=True)
response = await call_next(request)
process_time = time.time() - start_time
print("Cortex ID: " + cortex_id + ". > Middleware post-response. Duration: " + str(process_time), flush=True)
return response

@app.post("/")
async def run(request: Request, image: UploadFile = File(...), renderFactor:Optional[int] = Form(12), requestId:Optional[str] = Form('-1'),include_header:Optional[str] = Form('bin')):
try:
cortexId = request.headers.get('x-request-id')
print("Cortex ID: " + cortexId + ". Request ID: " + requestId + " >>> Request received. Time stamp: " + str(datetime.now()))
start = time.time()

image = await image.read()
payload = {}
payload['image'] = image
payload['renderFactor'] = renderFactor
payload['requestId'] = requestId
payload['include_header'] = include_header

response = pred.predict(payload)
end = time.time()
totalTime = round(end - start, 2)
print("Cortex ID: " + cortexId + ". Request ID: " + requestId + " > Request processed. Duration: " + str(totalTime) + " seconds. Time stamp: " + str(datetime.now()))
if totalTime > 5:
print("Long request detected. Duration: " + str(totalTime))
return response

except Exception as error:
end = time.time()
print(str(error))
print("Cortex ID: " + cortexId + ". Request ID: " + requestId + " > Error. Duration: " + str(round(end - start, 2)) + " seconds . Time stamp: " + str(datetime.now()))
raise HTTPException(status_code = 500, detail = str(error))
config = {}
pred = PythonPredictor(config)

基本信息、根本原因;Rant

嘿,我花了相当多的时间在这个悬而未决的问题上(对于我组织中有多个自定义MDW的关键应用程序(。这基本上是因为基于@app.middleware("http")的中间件是在后台继承Starlette的BaseHTTPMiddleware创建的。因此,显式继承BaseHTTPMiddleware编写的MDW也存在这个问题。造成这种情况的原因非常复杂,这就是我迄今为止所理解的:

  1. 从这里(GitHub Starlette问题(和这里(GitHub FastAPI问题(:我了解到这个方法使用StreamingResponse,它有一些问题
  2. 从这里(GitHub Starlette问题(:我了解到挂起的原因之一是:在API中,请求生命周期中只允许等待request.json()一次,而BaseHTTPMiddleware也会自己创建一个请求对象(这会导致挂起问题,因为这是另一个请求(

最后一个链接还提到,同样导致挂起问题的是,由于StreamingResponse;响应的读取在第一次读取时不知何故会耗尽,当它进入第二次读取时,它会无限期地等待它,从而导致挂起。(这里的第一个和第二个意思是:在ASGI应用程序中,消息被发送到具有各种类型的客户端和应用程序,如http.response.starthttp.response.body等(

解决方案

所以,不要使用任何与BaseHTTPMiddleware有关的内容。为了解决这个问题,我使用提供的ASGI规范编写了所有自定义中间件

您可以这样制作自定义中间件:

from starlette.types import ASGIApp, Receive, Send, Message
class LogProcessingTime:
def __init__(self, app: ASGIApp) -> None:
self.app = app

async def __call__(self, scope: Scope, receive: Receive, send: Send):

start_time = time.time()
async def send_wrapper(message: Message):
# This will capture response coming from APP Layer
# response body will be in message where the type is
# "http.response.body"
if message["type"] == "http.response.body":
process_time = time.time() - start_time
# you can log this process_time now any way you prefer
await send(message)

await self.app(scope, receive, send_wrapper)

# you can add this to your app this way:
app.add_middleware(LogProcessingTime)

在我的案例中,唯一的问题是starlette的版本。我从0.21.0升级到0.28.0,问题得到了解决。我使用了以下命令:

pip install --upgrade starlette

解决方案1

从你的代码中,我可以看到你使用了starlette.responses包和FastAPI的响应包中的响应,这可能导致了的挂起问题

from starlette.responses import Response
# remove the Response from fastapi
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request

解决方案2

如果您的问题仍然存在

FastAPI文档指出,从fastapi包导入的Request最好来自Starlette(Starlette请求文档链接(

您也可以使用from starlette.requests import RequestFastAPI为开发者提供了便利。但它直接来自Starlette

from starlette.requests import Request替换from fastapi import Request

该链接中的官方FastAPI github问题也存在类似问题,该应用程序使用uvicorn <file>:app运行。下面用starlette.requests直接实现的代码块没有产生挂起问题,这表明问题是由FastAPI引起的。

from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
app = Starlette()

@app.middleware("http")
async def func(request: Request, call_next):
#print(await request.json())
return await call_next(request)

@app.route('/', methods=["POST"])
def homepage(request):
return JSONResponse({"Hello": "World"})

确保在您的代码中使用starlette.requestsstarlette.responses,如下所示

from starlette.responses import Response
from starlette.requests import Request
# Request and Response removed from fastapi as directly referred from starlette
from fastapi import FastAPI, File, UploadFile, Form, HTTPException 
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
cortex_id = request.headers.get('x-request-id')
start_time = time.time()
print("Cortex ID: " + cortex_id + ". > Middleware pre-request. Time stamp: " + str(start_time), flush=True)
response = await call_next(request)
process_time = time.time() - start_time
print("Cortex ID: " + cortex_id + ". > Middleware post-response. Duration: " + str(process_time), flush=True)
return response

最新更新