我想每个人都知道如何处理 django 中长时间运行的任务:使用芹菜和放松。但是,如果我想通过 aiohttp(或龙卷风)获得 websockets 的好处怎么办?
假设我有非常受 CPU 限制的任务,可能需要几秒钟到多(5-10)分钟。在 websocket 循环中处理此任务并通知用户进度看起来不错。没有 ajax 请求,对短任务的响应非常快。
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.tp == aiohttp.MsgType.text:
answer_to_the_ultimate_question_of_life_the_universe_and_everything =
long_running_task(msg.data, NotificationHelper(ws))
ws.send_str(json.dumps({
'action': 'got-answer',
'data': answer_to_the_ultimate_question_of_life_the_universe_and_everything,
}))
return ws
但另一方面,据我所知,以这种方式服务的 CPU 密集型任务会阻止整个线程。如果我有 10 个工作线程和 11 个客户端想要使用应用程序,则在第一个客户端的任务完成之前,不会为第 11 个客户端提供服务。
也许,我应该运行在芹菜中看起来很大的任务和在主循环中看起来很小的任务?
所以,我的问题:是否有任何好的设计模式来使用异步服务器为长时间运行的任务提供服务?
谢谢!
只需通过loop.run_in_executor()
运行长时间运行的 CPU 密集型任务,并通过 loop.call_soon_threadsafe()
发送进度通知。
如果您的作业不是 CPU 而是 IO 绑定的(例如发送电子邮件),您可以通过调用创建新任务loop.create_task()
。它看起来像是生成新线程。
如果您不能使用即发即弃的方法,则需要使用像 RabbitMQ 这样的持久消息代理(有 https://github.com/benjamin-hodgson/asynqp 库用于以异步方式与 Rabbit 通信)。