包含带有属性的芹菜任务的装饰器



我正在处理一个Django项目,该项目有几个应用程序,每个应用程序都有自己的任务集,而且运行得很好。但是,一般来说,每个应用程序的任务集都使用相同的属性来限制速率、重试等。我试图创建一个具有所有这些通用属性的装饰器,并将目标函数设置为任务。

所以,我的example/tasks.py:中有这个

from celery import current_app

@current_app.task(
queue='example_queue',
max_retries=3,
rate_limit='10/s')
def example_task_1():
...
@current_app.task(
queue='example_queue',
max_retries=3,
rate_limit='10/s')
def example_task_2():
...

我正在尝试类似的东西:

from celery import current_app, Task

class ExampleTask(Task):
def __init__(self, task, *args, **kwargs):
super().__init__(*args, **kwargs)
self.task = task
def run(self):
self.task()

def example_decorator_task(func):
@wraps(func)
def wrapped(self, *args, **kwargs):
return ExampleTask(func).delay(
queue='example_queue',
max_retries=3,
rate_limit='10/s')
@example_decorator_task
def example_task_1():
...
@example_decorator_task
def example_task_2():
...

我实现了这一点,但通过调用example_task_1.delay(...),任务将无法正常工作,因为它是在包装器中执行的。有什么想法吗?

使用apply_async而不是delay,您的装饰器应该改为:

def decorator(function):
def wrapper(*args, **kwargs):
return function.apply_async(args=[*args], kwargs={**kwargs}, **{
'queue': 'example_queue',
'max_retries': 3,
'rate_limit': '10/s'
})
return wrapper

更多详细信息:

  • apply_async

  • 延迟

当您传递参数时,参数化的装饰器会转换为无参数的装饰器,但不会将它们应用于函数。因此,您可以通过将结果存储到一个变量中来创建一个新的装饰器,然后将其应用于各种任务函数。参见:

from celery import current_app
rate_limited_task = current_app.task(
queue='example_queue',
max_retries=3,
rate_limit='10/s'
)
@rate_limited_task
def example_task_1():
...
@rate_limited_task
def example_task_2():
...

相关内容

  • 没有找到相关文章

最新更新