我正在处理一个Django项目,该项目有几个应用程序,每个应用程序都有自己的任务集,而且运行得很好。但是,一般来说,每个应用程序的任务集都使用相同的属性来限制速率、重试等。我试图创建一个具有所有这些通用属性的装饰器,并将目标函数设置为任务。
所以,我的example/tasks.py
:中有这个
from celery import current_app
@current_app.task(
queue='example_queue',
max_retries=3,
rate_limit='10/s')
def example_task_1():
...
@current_app.task(
queue='example_queue',
max_retries=3,
rate_limit='10/s')
def example_task_2():
...
我正在尝试类似的东西:
from celery import current_app, Task
class ExampleTask(Task):
def __init__(self, task, *args, **kwargs):
super().__init__(*args, **kwargs)
self.task = task
def run(self):
self.task()
def example_decorator_task(func):
@wraps(func)
def wrapped(self, *args, **kwargs):
return ExampleTask(func).delay(
queue='example_queue',
max_retries=3,
rate_limit='10/s')
@example_decorator_task
def example_task_1():
...
@example_decorator_task
def example_task_2():
...
我实现了这一点,但通过调用example_task_1.delay(...)
,任务将无法正常工作,因为它是在包装器中执行的。有什么想法吗?
使用apply_async
而不是delay
,您的装饰器应该改为:
def decorator(function):
def wrapper(*args, **kwargs):
return function.apply_async(args=[*args], kwargs={**kwargs}, **{
'queue': 'example_queue',
'max_retries': 3,
'rate_limit': '10/s'
})
return wrapper
更多详细信息:
apply_async
延迟
当您传递参数时,参数化的装饰器会转换为无参数的装饰器,但不会将它们应用于函数。因此,您可以通过将结果存储到一个变量中来创建一个新的装饰器,然后将其应用于各种任务函数。参见:
from celery import current_app
rate_limited_task = current_app.task(
queue='example_queue',
max_retries=3,
rate_limit='10/s'
)
@rate_limited_task
def example_task_1():
...
@rate_limited_task
def example_task_2():
...