Python中如何处理具有自定义优先级的并发请求


  1. 我的请求QPS很高
  2. 我一次只能处理一个请求。因此,挂起的请求需要存储在本地Array/Queue中
  3. 当有争用时,我不想要FCFS(先到先得(。相反,我想处理基于自己的一些自定义逻辑的请求

类似的伪代码

def webApiHandler(request):
future = submit(request)
response = wait(future) # Wait time is depending on its priority
return response

我可以使用哪些基元来实现这一点?事件循环?异步?线程?

------------编辑---------------

这是同步的API调用,所有事情都应该在本地处理,并在计算完成后尽快响应。我不打算像芹菜一样使用工作队列

根据您的需求(但为什么不想要并发?(,您可能想要使用的基本上是优先级队列,这是一个具有。。。priority:这里是实现的信息,您可以在带有queue模块的python中使用它(这里是doc(

它是按优先级排序的,因此较高的优先级位于队列的末尾。

然后,您的实现将决定如何对请求求值,并设置该特定请求的优先级。但两个相同的优先级将被视为队列中的优先级。

然后,您在另一个线程中编写一个使用者,该使用者将弹出队列中的第一个(或最后一个,取决于您认为的最高优先级(项目。

您可能想要查看并启用并发性和额外功能的是celene,它是一个分布式任务队列框架。(它允许队列、优先级,也可以与任何数量的工作线程一起运行(在您的情况下,任意=1,但您真的会因为大量请求而陷入非并发吗?(。

示例:

import asyncio
import threading
from typing import Dict
# Local queue for pending compute (watchout, may need to replace dict for thread-safe ⚠️) 
pending_computes: Dict[int, list[threading.Event]] = {}
# The queue manager to pick a pending compute
async def poll_next():
# 1. A computed example after updating model priority
priorities = [2, 1, 3] # TODO: handle the empty array.
# 2. Find the next compute event reference
next = pending_computes.get(priorities[0]) # TODO: handle empty dict.
# 3. Kick off the next compute
next.set()
# The FastAPI async handler
async def cloud_compute_api(model_id: int, intput: bytes):
# 1. Enqueue current compute as an event.
compute_event = threading.Event()
pending_computes.get(model_id, []).append(compute_event)
# 2. Poll the next pending compute based on.
asyncio.create_task(poll_next) 
# 3. Wait until its own compute is set to GO. Wait up to 10 seconds.
compute_event.wait(10)
# 4. compute starts 🚀🚀🚀
res = compute(model_id, intput)
asyncio.create_task(poll_next) # Poll next pending compute, if any
return res

最新更新