我在应用程序中使用celery
来运行定期任务。让我们看看下面的简单示例
from myqueue import Queue
@perodic_task(run_every=timedelta(minutes=1))
def process_queue():
queue = Queue()
uid, questions = queue.pop()
if uid is None:
return
job = group(do_stuff(q) for q in questions)
job.apply_async()
def do_stuff(question):
try:
...
except:
...
raise
正如您在上面的示例中所看到的,我使用 celery
来运行异步任务,但是(因为它是一个队列)我需要在do_stuff
或queue.ack(uid)
的情况下执行queue.fail(uid)
。在这种情况下,在这两种情况下,从我的任务中获得一些回调将非常清楚和有用 - on_failure
和 on_success
.
我看过一些文档,但从未见过将回调与apply_async
一起使用的做法。有可能做到吗?
子类 Task 类并重载 on_success 和 on_failure 函数:
from celery import Task
class CallbackTask(Task):
def on_success(self, retval, task_id, args, kwargs):
'''
retval – The return value of the task.
task_id – Unique id of the executed task.
args – Original arguments for the executed task.
kwargs – Original keyword arguments for the executed task.
'''
pass
def on_failure(self, exc, task_id, args, kwargs, einfo):
'''
exc – The exception raised by the task.
task_id – Unique id of the failed task.
args – Original arguments for the task that failed.
kwargs – Original keyword arguments for the task that failed.
'''
pass
用:
@celery.task(base=CallbackTask) # this does the trick
def add(x, y):
return x + y
您可以通过链接指定成功和错误回调,并在调用apply_async时link_err kwargs。芹菜文档包括一个明显的例子:http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks