如何将芹菜与asyncio结合起来



如何创建一个包装器,使芹菜任务看起来像asyncio.Task ?或者是否有更好的方法将芹菜与asyncio集成?

@asksol,芹菜的创造者,说::

将芹菜用作异步I/O框架之上的分布式层是很常见的(提示:将cpu绑定的任务路由到prefork worker意味着它们不会阻塞事件循环)。

但是我找不到任何专门为asyncio框架的代码示例。

编辑:01/12/2021之前的答案(在底部找到)没有很好地老化,因此我添加了一个可能的解决方案的组合,可能会满足那些谁仍然看如何共同使用asyncio和芹菜

让我们先快速分解用例(这里有更深入的分析:asyncio和协程vs任务队列):

  • 如果任务是I/O绑定,那么使用协程和asyncio会更好。
  • 如果任务是CPU限制的,那么最好使用芹菜或其他类似的任务管理系统。

所以它在Python的"做一件事并把它做好"上下文中是有意义的。不要把asyncio和芹菜混在一起。

但是,如果我们希望能够同时异步运行一个方法和一个异步任务,会发生什么呢?那么我们有一些选择可以考虑:

  • 我能找到的最好的例子是:https://johnfraney.ca/posts/2018/12/20/writing-unit-tests-celery-tasks-async-functions/(我刚刚发现这是@Franey的回应):

    1. 定义async方法

    2. 使用asgirefsync.async_to_sync模块包装async方法并在芹菜任务中同步运行:

      # tasks.py
      import asyncio
      from asgiref.sync import async_to_sync
      from celery import Celery
      app = Celery('async_test', broker='a_broker_url_goes_here')
      async def return_hello():
          await asyncio.sleep(1)
          return 'hello'
      
      @app.task(name="sync_task")
      def sync_task():
          async_to_sync(return_hello)()
      
  • 我在FastAPI应用程序中遇到的一个用例与前面的示例相反:

    1. 一个密集的CPU绑定进程占用了异步端点。

    2. 解决方案是将异步CPU绑定进程重构为一个芹菜任务,并从芹菜队列传递一个任务实例来执行。

    3. 一个可视化的最小示例:

      import asyncio
      import uvicorn
      from celery import Celery
      from fastapi import FastAPI
      app = FastAPI(title='Example')
      worker = Celery('worker', broker='a_broker_url_goes_here')
      @worker.task(name='cpu_boun')
      def cpu_bound_task():
          # Does stuff but let's simplify it
          print([n for n in range(1000)])
      @app.get('/calculate')
      async def calculate():
          cpu_bound_task.delay()
      if __name__ == "__main__":
          uvicorn.run('main:app', host='0.0.0.0', port=8000)
      
  • 另一个解决方案似乎是@juanra和@danius在他们的答案中提出的,但是我们必须记住,当我们混合同步和异步执行时,性能往往会受到影响,因此,在我们决定在prod环境中使用它们之前,需要对这些答案进行监控。

最后,有一些现成的解决方案,我不能推荐(因为我自己没有使用过),但我将在这里列出它们:

  • 芹菜池AsyncIO这似乎解决了什么芹菜5.0没有,但请记住,它似乎有点实验性(版本0.2.0今天01/12/2021)
  • aiotasks声称是"一个类似于芹菜的任务管理器,用于分发Asyncio协同程序"。但似乎有点陈旧(最近一次提交大约2年前)

这不是很好地老化,是吗?芹菜5.0版本没有实现asyncio兼容性,因此我们不知道何时以及是否会实现…出于响应遗留原因(因为它是当时的答案)和注释延续的原因,将其留在这里。

这将在芹菜5.0版本中实现,正如官方网站所述:

http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html前言

    芹菜的下一个主要版本将只支持Python 3.5,我们计划利用新的asyncio库。
  1. 放弃对Python 2的支持将使我们能够删除大量的兼容性代码,并且使用Python 3.5允许我们利用类型,async/await, asyncio和类似的概念,这些概念在旧版本中没有其他选择。

以上引用自上一个链接

所以最好的办法是等待 5.0版本发布!

与此同时,快乐的编码:)

这个简单的方法对我来说很好:

import asyncio
from celery import Celery
app = Celery('tasks')
async def async_function(param1, param2):
    # more async stuff...
    pass
@app.task(name='tasks.task_name', queue='queue_name')
def task_name(param1, param2):
    asyncio.run(async_function(param1, param2))

这是一个简单的帮助,您可以使用它来使芹菜任务可等待:

import asyncio
from asgiref.sync import sync_to_async
# Converts a Celery tasks to an async function
def task_to_async(task):
    async def wrapper(*args, **kwargs):
        delay = 0.1
        async_result = await sync_to_async(task.delay)(*args, **kwargs)
        while not async_result.ready():
            await asyncio.sleep(delay)
            delay = min(delay * 1.5, 2)  # exponential backoff, max 2 seconds
        return async_result.get()
    return wrapper

sync_to_async一样,它可以用作直接的包装器:

@shared_task
def get_answer():
    sleep(10) # simulate long computation
    return 42    
result = await task_to_async(get_answer)()

…作为装饰者:

@task_to_async
@shared_task
def get_answer():
    sleep(10) # simulate long computation
    return 42    
result = await get_answer()
当然,这不是一个完美的解决方案,因为它依赖于轮询。但是,在芹菜正式提供更好的解决方案之前,从Django异步视图调用芹菜任务应该是一个很好的解决方案。 编辑2021/03/02:增加对sync_to_async的调用来支持急切模式。

您可以使用文档中描述的run_in_executor将任何阻塞调用包装到任务中,我还在示例中添加了自定义超时:

def run_async_task(
    target,
    *args,
    timeout = 60,
    **keywords
) -> Future:
    loop = asyncio.get_event_loop()
    return asyncio.wait_for(
        loop.run_in_executor(
            executor,
            functools.partial(target, *args, **keywords)
        ),
        timeout=timeout,
        loop=loop
    )
loop = asyncio.get_event_loop()
async_result = loop.run_until_complete(
    run_async_task, your_task.delay, some_arg, some_karg="" 
)
result = loop.run_until_complete(
    run_async_task, async_result.result 
)

我发现这样做的最干净的方法是将async函数包装在asgiref.sync.async_to_sync中(来自asgiref):

from asgiref.sync import async_to_sync
from celery.task import periodic_task

async def return_hello():
    await sleep(1)
    return 'hello'

@periodic_task(
    run_every=2,
    name='return_hello',
)
def task_return_hello():
    async_to_sync(return_hello)()

我从我写的一篇博客文章中摘录了这个例子。

我通过在Celery -pool-asyncio库中组合芹菜和asyncio来解决这个问题。

这是我在必要时处理异步协程的芹菜的实现:

包装芹菜类以扩展其功能:

from celery import Celery
from inspect import isawaitable
import asyncio

class AsyncCelery(Celery):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.patch_task()
        if 'app' in kwargs:
            self.init_app(kwargs['app'])
    def patch_task(self):
        TaskBase = self.Task
        class ContextTask(TaskBase):
            abstract = True
            async def _run(self, *args, **kwargs):
                result = TaskBase.__call__(self, *args, **kwargs)
                if isawaitable(result):
                    await result
            def __call__(self, *args, **kwargs):
                asyncio.run(self._run(*args, **kwargs))
        self.Task = ContextTask
    def init_app(self, app):
        self.app = app
        conf = {}
        for key in app.config.keys():
            if key[0:7] == 'CELERY_':
                conf[key[7:].lower()] = app.config[key]
        if 'broker_transport_options' not in conf and conf.get('broker_url', '')[0:4] == 'sqs:':
            conf['broker_transport_options'] = {'region': 'eu-west-1'}
        self.config_from_object(conf)

celery = AsyncCelery()

对于任何偶然发现这个寻找帮助特别是异步sqlalchemy(即,使用asyncio扩展)和芹菜任务的人,明确地处理引擎将解决这个问题。这个特殊的示例使用asyncpg

的例子:

from sqlalchemy.ext.asyncio import (
    AsyncSession,
    create_async_engine,
)
from sqlalchemy.orm import sessionmaker
from asgiref.sync import async_to_sync

engine = create_async_engine("some_uri", future=True)
async_session_factory = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

@celery_app.task(name="task-name")
def sync_func() -> None:
    async_to_sync(some_func)()

async def some_func() -> None:
    async with get_db_session() as session:
        result = await some_db_query(session)
    # engine.dispose will be called on exit

@contextlib.asynccontextmanager
async def get_db_session() -> AsyncGenerator:
    try:
        db = async_session_factory()
        yield db
    finally:
        await db.close()
        await engine.dispose()

一个用asyncio实现芹菜的好方法:

import asyncio
from celery import Celery
app = Celery()
async def async_function(param):
    print('do something')
@app.task()
def celery_task(param):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(async_function(param))

我做了这个包装@app的实现。任务装饰器实现async delay()和apply_async()函数。它依赖于Redis,并扩展了Redis后端使用pubsub来等待结果。

import json
import asyncio
import redis.asyncio
from celery.backends.redis import RedisBackend
from django.conf import settings
from MYAPP.celery import app

APOOL = redis.asyncio.ConnectionPool(host=settings.REDIS_HOST,
                                     db=settings.REDIS_DB,
                                     port=settings.REDIS_PORT)
CELERY_TASK_PREFIX = b'celery-task-meta-'
class PubSubBackend(RedisBackend):
    def __init__(self, *argz, **kwargs):
        kwargs['host'] = settings.REDIS_HOST
        kwargs['port'] = settings.REDIS_PORT
        kwargs['db'] = settings.REDIS_DB
        super().__init__(*argz, **kwargs)
    # This backend client will publish to subscribers when a task is finished
    def set(self, key, value, **retry_policy):
        return self.client.publish(key, value)
class RemoteTaskException(Exception):
    pass

class RemoteTaskTimeout(Exception):
    pass

TIMEOUT = 10
async def _read_task_result(channel, future):
    import time
    limit = time.time() + TIMEOUT
    while time.time() < limit:
        message = await channel.get_message(ignore_subscribe_messages=True)
        if message is not None:
            future.set_result(message)
            return
    future.set_result({'data': json.dumps(
        { 'status': 'TIMEOUT' },
    ).encode()})
async def get_task_result(task_id):
    future = asyncio.Future()
    conn = redis.asyncio.Redis(connection_pool=APOOL)
    async with conn.pubsub() as pubsub:
        queue = f'{CELERY_TASK_PREFIX.decode()}{task_id}'.encode()
        await pubsub.subscribe(queue)
        return_task = _read_task_result(pubsub, future)
        await asyncio.create_task(return_task)
        result = future.result()
        result_data = json.loads(result['data'].decode())
        status = result_data['status']
        if status == 'SUCCESS':
            return result_data['result']
        elif status == 'FAILURE':
            raise RemoteTaskException(result_data['result']['exc_message'][0])
        elif status == 'TIMEOUT':
            raise RemoteTaskTimeout()
        else:
            raise Exception(f'Uknown task status {status}')

class AsyncTask:
    def __init__(self, sub):
        self.task = app.task(sub)
    async def delay(self, *args, **kwargs):
        task = self.task.delay(*args, **kwargs)
        return await get_task_result(task.id)
    def s(self, *args, **kwargs):
        return self.task.s(*args, **kwargs)
    async def apply_async(self, *args, **kwargs):
        task = self.task.apply_async(*args, **kwargs)
        return await get_task_result(task.id)
    def __call__(self, *args, **kwargs):
        return self.task(*args, **kwargs)

# Task decorator
def async_task(sub):
    return AsyncTask(sub)

然后,在settings.py:

CELERY_RESULT_BACKEND = 'myapp.somewhere.PubSubBackend'

相关内容

  • 没有找到相关文章

最新更新