在python中实现从Azure Redis缓存中的并发数据提取



我目前正在使用快速API构建低延迟模型推理API,我们使用azure redis缓存标准版本来获取特征,使用onnx模型来进行快速模型推理。我使用aioredis来实现redis中数据读取的并发性。我从redis调用两个特性请求,一个用于获取单个字符串的userID,另一个用于提取字符串列表的product,稍后我使用json解析将其转换为float列表。

对于一个请求,总共需要70-80ms,但对于超过10个并发请求,redis需要超过400ms才能获取结果,这是巨大的,并且在负载测试时,可以随着更多并发用户的增加而线性增加。

从redis获取数据的代码是:

import numpy as np
import json
from ..Helpers.helper import curt_giver, milsec_calc
import aioredis
r = aioredis.from_url("redis://user:host",decode_responses=True)
async def get_user(user:list) -> str:
    user_data = await r.get(user)
    return user_data
async def get_products(product:list)-> list:
    product_data = await r.mget(product)
    return product_data
async def get_features(inputs: dict) -> list:
    
    st = curt_giver()
    user_data = await get_user(inputs['userId'])
    online_user_data = [json.loads(json.loads(user_data))]
    end = curt_giver()
    print("Time to get user features: ", milsec_calc(st,end))
    
    st = curt_giver()
    product_data = await get_products(inputs['productIds'])
    online_product_data = []
    for i in product_data:
        online_product_data.append(json.loads(json.loads(i)))
    end = curt_giver()
    print("Time to get product features: ", milsec_calc(st,end))
    user_outputs = np.asarray(online_user_data,dtype=object)
    product_outputs = np.asarray(online_product_data,dtype=object)
    output = np.concatenate([np.concatenate([user_outputs]*product_outputs.shape[0])
    ,product_outputs],axis = 1)
    return output.tolist()

curt_give((是以毫秒为单位的时间。主文件中的代码是:

    from fastapi import FastAPI
    from v1.redis_conn.get_features import get_features
    
    from model_scoring.score_onnx import score_features
    from v1.post_processing.sort_results import sort_results
    
    from v1.api_models.input_models import Ranking_Input
    from v1.api_models.output_models import Ranking_Output
    from v1.Helpers.helper import curt_giver, milsec_calc
    import numpy as np
    
    
    app = FastAPI()
    
    # Sending user and product ids through body, 
    # Hence a POST request is well suited for this, GET has unexpected behaviour
    @app.post("/predict", response_model = Ranking_Output)
    async def rank_products(inp_req: Ranking_Input):
      beg = curt_giver()
      reqids = inp_req.dict()
      st = curt_giver()
      features = await get_features(reqids)
      end = curt_giver()
    
      print("Total Redis duration ( user + products fetch): ", milsec_calc(st,end))
    
      data = np.asarray(features,dtype=np.float32,order=None)
      
      st = curt_giver()
      scores = score_features(data)
      end = curt_giver()
    
      print("ONNX model duration: ", milsec_calc(st,end))
    
      Ranking_results = sort_results(scores, list(reqids["productIds"]))
      end = curt_giver()
      print("Total time for API: ",milsec_calc(beg,end))
      resp_json = {"requestId": inp_req.requestId,
      "ranking": Ranking_results,
      "zipCode": inp_req.zipCode}
    
      return resp_json    

通过计时,我可以了解到,对于一个请求,它花费的时间非常少,但对于并发用户,获取产品数据的时间保持线性增长。获取一个请求的时间所有值都以毫秒为单位:

Time to get user features:  1
Time to get product features:  47
Total Redis duration ( user + products fetch):  53
ONNX model duration:  2
Total time for API:  60

获取10个以上并发请求的时间:

Time to get user features:  151
Time to get user features:  150
Time to get user features:  151
Time to get user features:  52
Time to get user features:  51
Time to get product features:  187
Total Redis duration ( user + products fetch):  433
ONNX model duration:  2
Total time for API:  440
INFO:     127.0.0.1:60646 - "POST /predict HTTP/1.0" 200 OK
Time to get product features:  239
Total Redis duration ( user + products fetch):  488
ONNX model duration:  2
Total time for API:  495
INFO:     127.0.0.1:60644 - "POST /predict HTTP/1.0" 200 OK
Time to get product features:  142
Total Redis duration ( user + products fetch):  297
ONNX model duration:  2
Total time for API:  303
INFO:     127.0.0.1:60648 - "POST /predict HTTP/1.0" 200 OK
Time to get product features:  188
Total Redis duration ( user + products fetch):  342
ONNX model duration:  2
Total time for API:  348

它不断增加,甚至达到900ms+从redis中提取两个数据。有没有什么方法可以有效地以低延迟提取并发数据,并将并发请求增加到500,而不影响延迟,我的目标是每秒300个并发请求的300ms以下。

我被困在这一点上任何帮助,我都会非常感激。

您的一些代码似乎正在阻塞。查看您的日志,它开始时是异步的(不是并发的,这在这里没有发生(。但随后它会逐一处理所有呼叫。

查看您的代码,它从未将控制权交还给product_data = await get_products(inputs['productIds'])行之后的事件循环。

如果之后的代码需要很长时间,那么所有其他请求都在等待执行(并且将串行执行(。我们缺少一些代码(这不是MRE(,所以很难说到底发生了什么。例如,我们不知道日志Total Redis duration ( user + products fetch):ONNX model duration:在代码中的何处生成,并且您使用的是从未启动过的变量(如online_product_data(。

底线是;如果你想在FastAPI中获得更高的并发性,你需要更多的进程来运行你的代码。这意味着要么有更多的Uvicorn工作人员,要么有一些负载均衡器和更多的Uvicorn实例(假设您使用的是Uvicorn(。否则,请尝试查找任何可能成为非阻塞的阻塞IO。然而,我猜你的大部分阻塞代码都是CPU密集型的(而不是IO密集型的(,所以增加处理请求的Python进程数量将是你最好的举措。

您的代码是正确的。在处理并行请求时,您应该观察到合作收益。

需要明确的是,请在没有JSON解析的情况下测量时间,只配置此行:

user_data = await get_user(inputs['userId'])

请在redis配置中检查您的THREAD_COUNT。如果您正在测量10个并行请求的性能,请将其至少设置为10。

如果没有帮助,请尝试使用另一个库,如aiocache(不幸的是,我不熟悉aioredis(。你发布的时间不是这样的设计应该有的。

请在更改后公布您的时间和结果。

最新更新