我可以向使用任务装饰器创建的芹菜任务添加on_failure回调吗?



我很确定这只有在我创建自己的任务类时才能完成,但我想知道是否有其他人找到了做到这一点的方法。

这是一个完整的解决方案(适用于 Celery 4+):

import celery
from celery.task import task
class MyBaseClassForTask(celery.Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        # exc (Exception) - The exception raised by the task.
        # args (Tuple) - Original arguments for the task that failed.
        # kwargs (Dict) - Original keyword arguments for the task that failed.
        print('{0!r} failed: {1!r}'.format(task_id, exc))
@task(name="foo:my_task", base=MyBaseClassForTask)
def add(x, y):
    raise KeyError()

资源:

  • http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-inheritance
  • http://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.on_failure
  • http://docs.celeryproject.org/en/latest/userguide/tasks.html#abstract-classes

您可以直接向装饰器提供函数:

def fun(self, exc, task_id, args, kwargs, einfo):
    print('Failed!')
@task(name="foo:my_task", on_failure=fun)
def add(x, y):
    raise KeyError()

相关内容

  • 没有找到相关文章

最新更新