如何创建一个包装器,使芹菜任务看起来像asyncio.Task
?或者是否有更好的方法将芹菜与asyncio
集成?
@asksol,芹菜的创造者,说::
将芹菜用作异步I/O框架之上的分布式层是很常见的(提示:将cpu绑定的任务路由到prefork worker意味着它们不会阻塞事件循环)。
但是我找不到任何专门为asyncio
框架的代码示例。
编辑:01/12/2021之前的答案(在底部找到)没有很好地老化,因此我添加了一个可能的解决方案的组合,可能会满足那些谁仍然看如何共同使用asyncio和芹菜
让我们先快速分解用例(这里有更深入的分析:asyncio和协程vs任务队列):
- 如果任务是I/O绑定,那么使用协程和asyncio会更好。
- 如果任务是CPU限制的,那么最好使用芹菜或其他类似的任务管理系统。
所以它在Python的"做一件事并把它做好"上下文中是有意义的。不要把asyncio和芹菜混在一起。
但是,如果我们希望能够同时异步运行一个方法和一个异步任务,会发生什么呢?那么我们有一些选择可以考虑:
-
我能找到的最好的例子是:https://johnfraney.ca/posts/2018/12/20/writing-unit-tests-celery-tasks-async-functions/(我刚刚发现这是@Franey的回应):
-
定义async方法
-
使用
asgiref
的sync.async_to_sync
模块包装async方法并在芹菜任务中同步运行:# tasks.py import asyncio from asgiref.sync import async_to_sync from celery import Celery app = Celery('async_test', broker='a_broker_url_goes_here') async def return_hello(): await asyncio.sleep(1) return 'hello' @app.task(name="sync_task") def sync_task(): async_to_sync(return_hello)()
-
-
我在FastAPI应用程序中遇到的一个用例与前面的示例相反:
-
一个密集的CPU绑定进程占用了异步端点。
-
解决方案是将异步CPU绑定进程重构为一个芹菜任务,并从芹菜队列传递一个任务实例来执行。
-
一个可视化的最小示例:
import asyncio import uvicorn from celery import Celery from fastapi import FastAPI app = FastAPI(title='Example') worker = Celery('worker', broker='a_broker_url_goes_here') @worker.task(name='cpu_boun') def cpu_bound_task(): # Does stuff but let's simplify it print([n for n in range(1000)]) @app.get('/calculate') async def calculate(): cpu_bound_task.delay() if __name__ == "__main__": uvicorn.run('main:app', host='0.0.0.0', port=8000)
-
-
另一个解决方案似乎是@juanra和@danius在他们的答案中提出的,但是我们必须记住,当我们混合同步和异步执行时,性能往往会受到影响,因此,在我们决定在prod环境中使用它们之前,需要对这些答案进行监控。
最后,有一些现成的解决方案,我不能推荐(因为我自己没有使用过),但我将在这里列出它们:
- 芹菜池AsyncIO这似乎解决了什么芹菜5.0没有,但请记住,它似乎有点实验性(版本0.2.0今天01/12/2021)
- aiotasks声称是"一个类似于芹菜的任务管理器,用于分发Asyncio协同程序"。但似乎有点陈旧(最近一次提交大约2年前)
这不是很好地老化,是吗?芹菜5.0版本没有实现asyncio兼容性,因此我们不知道何时以及是否会实现…出于响应遗留原因(因为它是当时的答案)和注释延续的原因,将其留在这里。
这将在芹菜5.0版本中实现,正如官方网站所述:
http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html前言
- 芹菜的下一个主要版本将只支持Python 3.5,我们计划利用新的asyncio库。
- 放弃对Python 2的支持将使我们能够删除大量的兼容性代码,并且使用Python 3.5允许我们利用类型,async/await, asyncio和类似的概念,这些概念在旧版本中没有其他选择。
以上引用自上一个链接
所以最好的办法是等待 5.0版本发布!与此同时,快乐的编码:)
这个简单的方法对我来说很好:
import asyncio
from celery import Celery
app = Celery('tasks')
async def async_function(param1, param2):
# more async stuff...
pass
@app.task(name='tasks.task_name', queue='queue_name')
def task_name(param1, param2):
asyncio.run(async_function(param1, param2))
这是一个简单的帮助,您可以使用它来使芹菜任务可等待:
import asyncio
from asgiref.sync import sync_to_async
# Converts a Celery tasks to an async function
def task_to_async(task):
async def wrapper(*args, **kwargs):
delay = 0.1
async_result = await sync_to_async(task.delay)(*args, **kwargs)
while not async_result.ready():
await asyncio.sleep(delay)
delay = min(delay * 1.5, 2) # exponential backoff, max 2 seconds
return async_result.get()
return wrapper
像sync_to_async
一样,它可以用作直接的包装器:
@shared_task
def get_answer():
sleep(10) # simulate long computation
return 42
result = await task_to_async(get_answer)()
…作为装饰者:
@task_to_async
@shared_task
def get_answer():
sleep(10) # simulate long computation
return 42
result = await get_answer()
当然,这不是一个完美的解决方案,因为它依赖于轮询。但是,在芹菜正式提供更好的解决方案之前,从Django异步视图调用芹菜任务应该是一个很好的解决方案。
编辑2021/03/02:增加对sync_to_async
的调用来支持急切模式。
您可以使用文档中描述的run_in_executor
将任何阻塞调用包装到任务中,我还在示例中添加了自定义超时:
def run_async_task(
target,
*args,
timeout = 60,
**keywords
) -> Future:
loop = asyncio.get_event_loop()
return asyncio.wait_for(
loop.run_in_executor(
executor,
functools.partial(target, *args, **keywords)
),
timeout=timeout,
loop=loop
)
loop = asyncio.get_event_loop()
async_result = loop.run_until_complete(
run_async_task, your_task.delay, some_arg, some_karg=""
)
result = loop.run_until_complete(
run_async_task, async_result.result
)
我发现这样做的最干净的方法是将async
函数包装在asgiref.sync.async_to_sync
中(来自asgiref
):
from asgiref.sync import async_to_sync
from celery.task import periodic_task
async def return_hello():
await sleep(1)
return 'hello'
@periodic_task(
run_every=2,
name='return_hello',
)
def task_return_hello():
async_to_sync(return_hello)()
我从我写的一篇博客文章中摘录了这个例子。
我通过在Celery -pool-asyncio库中组合芹菜和asyncio来解决这个问题。
这是我在必要时处理异步协程的芹菜的实现:
包装芹菜类以扩展其功能:
from celery import Celery
from inspect import isawaitable
import asyncio
class AsyncCelery(Celery):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
def patch_task(self):
TaskBase = self.Task
class ContextTask(TaskBase):
abstract = True
async def _run(self, *args, **kwargs):
result = TaskBase.__call__(self, *args, **kwargs)
if isawaitable(result):
await result
def __call__(self, *args, **kwargs):
asyncio.run(self._run(*args, **kwargs))
self.Task = ContextTask
def init_app(self, app):
self.app = app
conf = {}
for key in app.config.keys():
if key[0:7] == 'CELERY_':
conf[key[7:].lower()] = app.config[key]
if 'broker_transport_options' not in conf and conf.get('broker_url', '')[0:4] == 'sqs:':
conf['broker_transport_options'] = {'region': 'eu-west-1'}
self.config_from_object(conf)
celery = AsyncCelery()
对于任何偶然发现这个寻找帮助特别是异步sqlalchemy
(即,使用asyncio
扩展)和芹菜任务的人,明确地处理引擎将解决这个问题。这个特殊的示例使用asyncpg
。
的例子:
from sqlalchemy.ext.asyncio import (
AsyncSession,
create_async_engine,
)
from sqlalchemy.orm import sessionmaker
from asgiref.sync import async_to_sync
engine = create_async_engine("some_uri", future=True)
async_session_factory = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
@celery_app.task(name="task-name")
def sync_func() -> None:
async_to_sync(some_func)()
async def some_func() -> None:
async with get_db_session() as session:
result = await some_db_query(session)
# engine.dispose will be called on exit
@contextlib.asynccontextmanager
async def get_db_session() -> AsyncGenerator:
try:
db = async_session_factory()
yield db
finally:
await db.close()
await engine.dispose()
一个用asyncio实现芹菜的好方法:
import asyncio
from celery import Celery
app = Celery()
async def async_function(param):
print('do something')
@app.task()
def celery_task(param):
loop = asyncio.get_event_loop()
return loop.run_until_complete(async_function(param))
我做了这个包装@app的实现。任务装饰器实现async delay()和apply_async()函数。它依赖于Redis,并扩展了Redis后端使用pubsub来等待结果。
import json
import asyncio
import redis.asyncio
from celery.backends.redis import RedisBackend
from django.conf import settings
from MYAPP.celery import app
APOOL = redis.asyncio.ConnectionPool(host=settings.REDIS_HOST,
db=settings.REDIS_DB,
port=settings.REDIS_PORT)
CELERY_TASK_PREFIX = b'celery-task-meta-'
class PubSubBackend(RedisBackend):
def __init__(self, *argz, **kwargs):
kwargs['host'] = settings.REDIS_HOST
kwargs['port'] = settings.REDIS_PORT
kwargs['db'] = settings.REDIS_DB
super().__init__(*argz, **kwargs)
# This backend client will publish to subscribers when a task is finished
def set(self, key, value, **retry_policy):
return self.client.publish(key, value)
class RemoteTaskException(Exception):
pass
class RemoteTaskTimeout(Exception):
pass
TIMEOUT = 10
async def _read_task_result(channel, future):
import time
limit = time.time() + TIMEOUT
while time.time() < limit:
message = await channel.get_message(ignore_subscribe_messages=True)
if message is not None:
future.set_result(message)
return
future.set_result({'data': json.dumps(
{ 'status': 'TIMEOUT' },
).encode()})
async def get_task_result(task_id):
future = asyncio.Future()
conn = redis.asyncio.Redis(connection_pool=APOOL)
async with conn.pubsub() as pubsub:
queue = f'{CELERY_TASK_PREFIX.decode()}{task_id}'.encode()
await pubsub.subscribe(queue)
return_task = _read_task_result(pubsub, future)
await asyncio.create_task(return_task)
result = future.result()
result_data = json.loads(result['data'].decode())
status = result_data['status']
if status == 'SUCCESS':
return result_data['result']
elif status == 'FAILURE':
raise RemoteTaskException(result_data['result']['exc_message'][0])
elif status == 'TIMEOUT':
raise RemoteTaskTimeout()
else:
raise Exception(f'Uknown task status {status}')
class AsyncTask:
def __init__(self, sub):
self.task = app.task(sub)
async def delay(self, *args, **kwargs):
task = self.task.delay(*args, **kwargs)
return await get_task_result(task.id)
def s(self, *args, **kwargs):
return self.task.s(*args, **kwargs)
async def apply_async(self, *args, **kwargs):
task = self.task.apply_async(*args, **kwargs)
return await get_task_result(task.id)
def __call__(self, *args, **kwargs):
return self.task(*args, **kwargs)
# Task decorator
def async_task(sub):
return AsyncTask(sub)
然后,在settings.py:
CELERY_RESULT_BACKEND = 'myapp.somewhere.PubSubBackend'