我目前正在使用快速API构建低延迟模型推理API,我们使用azure redis缓存标准版本来获取特征,使用onnx模型来进行快速模型推理。我使用aioredis来实现redis中数据读取的并发性。我从redis调用两个特性请求,一个用于获取单个字符串的userID,另一个用于提取字符串列表的product,稍后我使用json解析将其转换为float列表。
对于一个请求,总共需要70-80ms,但对于超过10个并发请求,redis需要超过400ms才能获取结果,这是巨大的,并且在负载测试时,可以随着更多并发用户的增加而线性增加。
从redis获取数据的代码是:
import numpy as np
import json
from ..Helpers.helper import curt_giver, milsec_calc
import aioredis
r = aioredis.from_url("redis://user:host",decode_responses=True)
async def get_user(user:list) -> str:
user_data = await r.get(user)
return user_data
async def get_products(product:list)-> list:
product_data = await r.mget(product)
return product_data
async def get_features(inputs: dict) -> list:
st = curt_giver()
user_data = await get_user(inputs['userId'])
online_user_data = [json.loads(json.loads(user_data))]
end = curt_giver()
print("Time to get user features: ", milsec_calc(st,end))
st = curt_giver()
product_data = await get_products(inputs['productIds'])
online_product_data = []
for i in product_data:
online_product_data.append(json.loads(json.loads(i)))
end = curt_giver()
print("Time to get product features: ", milsec_calc(st,end))
user_outputs = np.asarray(online_user_data,dtype=object)
product_outputs = np.asarray(online_product_data,dtype=object)
output = np.concatenate([np.concatenate([user_outputs]*product_outputs.shape[0])
,product_outputs],axis = 1)
return output.tolist()
curt_give((是以毫秒为单位的时间。主文件中的代码是:
from fastapi import FastAPI
from v1.redis_conn.get_features import get_features
from model_scoring.score_onnx import score_features
from v1.post_processing.sort_results import sort_results
from v1.api_models.input_models import Ranking_Input
from v1.api_models.output_models import Ranking_Output
from v1.Helpers.helper import curt_giver, milsec_calc
import numpy as np
app = FastAPI()
# Sending user and product ids through body,
# Hence a POST request is well suited for this, GET has unexpected behaviour
@app.post("/predict", response_model = Ranking_Output)
async def rank_products(inp_req: Ranking_Input):
beg = curt_giver()
reqids = inp_req.dict()
st = curt_giver()
features = await get_features(reqids)
end = curt_giver()
print("Total Redis duration ( user + products fetch): ", milsec_calc(st,end))
data = np.asarray(features,dtype=np.float32,order=None)
st = curt_giver()
scores = score_features(data)
end = curt_giver()
print("ONNX model duration: ", milsec_calc(st,end))
Ranking_results = sort_results(scores, list(reqids["productIds"]))
end = curt_giver()
print("Total time for API: ",milsec_calc(beg,end))
resp_json = {"requestId": inp_req.requestId,
"ranking": Ranking_results,
"zipCode": inp_req.zipCode}
return resp_json
通过计时,我可以了解到,对于一个请求,它花费的时间非常少,但对于并发用户,获取产品数据的时间保持线性增长。获取一个请求的时间所有值都以毫秒为单位:
Time to get user features: 1
Time to get product features: 47
Total Redis duration ( user + products fetch): 53
ONNX model duration: 2
Total time for API: 60
获取10个以上并发请求的时间:
Time to get user features: 151
Time to get user features: 150
Time to get user features: 151
Time to get user features: 52
Time to get user features: 51
Time to get product features: 187
Total Redis duration ( user + products fetch): 433
ONNX model duration: 2
Total time for API: 440
INFO: 127.0.0.1:60646 - "POST /predict HTTP/1.0" 200 OK
Time to get product features: 239
Total Redis duration ( user + products fetch): 488
ONNX model duration: 2
Total time for API: 495
INFO: 127.0.0.1:60644 - "POST /predict HTTP/1.0" 200 OK
Time to get product features: 142
Total Redis duration ( user + products fetch): 297
ONNX model duration: 2
Total time for API: 303
INFO: 127.0.0.1:60648 - "POST /predict HTTP/1.0" 200 OK
Time to get product features: 188
Total Redis duration ( user + products fetch): 342
ONNX model duration: 2
Total time for API: 348
它不断增加,甚至达到900ms+从redis中提取两个数据。有没有什么方法可以有效地以低延迟提取并发数据,并将并发请求增加到500,而不影响延迟,我的目标是每秒300个并发请求的300ms以下。
我被困在这一点上任何帮助,我都会非常感激。
您的一些代码似乎正在阻塞。查看您的日志,它开始时是异步的(不是并发的,这在这里没有发生(。但随后它会逐一处理所有呼叫。
查看您的代码,它从未将控制权交还给product_data = await get_products(inputs['productIds'])
行之后的事件循环。
如果之后的代码需要很长时间,那么所有其他请求都在等待执行(并且将串行执行(。我们缺少一些代码(这不是MRE(,所以很难说到底发生了什么。例如,我们不知道日志Total Redis duration ( user + products fetch):
和ONNX model duration:
在代码中的何处生成,并且您使用的是从未启动过的变量(如online_product_data
(。
底线是;如果你想在FastAPI中获得更高的并发性,你需要更多的进程来运行你的代码。这意味着要么有更多的Uvicorn工作人员,要么有一些负载均衡器和更多的Uvicorn实例(假设您使用的是Uvicorn(。否则,请尝试查找任何可能成为非阻塞的阻塞IO。然而,我猜你的大部分阻塞代码都是CPU密集型的(而不是IO密集型的(,所以增加处理请求的Python进程数量将是你最好的举措。
您的代码是正确的。在处理并行请求时,您应该观察到合作收益。
需要明确的是,请在没有JSON解析的情况下测量时间,只配置此行:
user_data = await get_user(inputs['userId'])
请在redis配置中检查您的THREAD_COUNT。如果您正在测量10个并行请求的性能,请将其至少设置为10。
如果没有帮助,请尝试使用另一个库,如aiocache
(不幸的是,我不熟悉aioredis
(。你发布的时间不是这样的设计应该有的。
请在更改后公布您的时间和结果。