我有一个后端应用程序,目前使用asyncio: fastapi编写web服务器,sqlalchemy 1.4 + asyncpg编写异步数据库驱动程序。我需要将任务部署到将运行和更新主机应用程序的工作者。目前我正在使用aio_pika
,但想要更强大的东西,如celery
与flower
。
我知道芹菜没有与asyncio集成。我也读过像这样的答案,我担心的是没有任务是异步的,这是微不足道的。我担心从主事件循环中启动任务。
我的主要问题是,my_task.delay()
/my_task.apply_async()
是否阻塞了正在运行的线程? 如果是这样,更好的方法是使用从中央mp.Queue
或ProcessPoolExecutor
中get
项的多处理工作进程,然后仅从该工作进程部署芹菜任务?
我想部署任务,理想情况下,在任务完成时收到通知。不过,这可以通过fastapi
接口在任务本身内部完成。我只是想确保部署任务不会阻塞异步事件循环。
我试图用你链接的帖子(这个)的答案做点什么。基本上我拿了他的代码,做了一些修改。它似乎在大多数情况下使用简单的任务正确工作,但我想这不是完全安全的,这只是我做的一个变通办法。下面是代码:
import asyncio
from celery import Celery
class AsyncCelery(Celery):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.patch_task()
def patch_task(self):
TaskBase = self.Task
class ContextTask(TaskBase):
abstract = True
async def _run(self, *args, **kwargs):
asyncio.set_event_loop(asyncio.get_event_loop())
result = TaskBase.__call__(self, *args, **kwargs)
return await result
def __call__(self, *args, **kwargs):
loop = asyncio.get_event_loop()
try:
return loop.run_until_complete(self._run(*args, **kwargs))
except:
return asyncio.run(self._run(*args, **kwargs))
self.Task = ContextTask