我正在创建一个自定义的 Celery 任务类,以覆盖任务达到最大重试次数 (on_failure
时发生的情况。如果任务失败,我需要更新用户模型的状态。
以下是我的自定义任务类:
class ReadyTask(Task):
def run(self, user):
try:
user.get_results()
except Exception as exc:
raise self.retry(exc=exc, max_retries=3)
def on_failure(self, exc, task_id, *args, **kwargs):
user.status = Status.READY
user.save()
如何将 User 对象传递给on_failure()
方法以更新其状态?
我相信您可以检查您的args
或kwargs
以获取用户的 ID,如果您将其作为参数发送到任务中。如果你在kwargs中制作它,那就更容易了,这样你就不必做arg位置检查了。然后只是从 id 中获取您的用户并进行更改?
因此,不要将其发送到run
函数中,而是作为参数/关键字参数发送到您正在调用的任务的函数,通过function.apply(kwargs)
、function.apply_async(kwargs=kwargs)
或function.delay(kwargs)
。
所以:
user_id = kwargs.get('user_id')
# then resolve to user object, then update object
您还可以将对象绑定到自定义任务类。通过使用bind=True
.
class ReadyTask(Task):
def run(self, user):
self.user_object = user
try:
self.user_object.get_results()
except Exception as exc:
raise self.retry(exc=exc, max_retries=3)
def on_failure(self, exc, task_id, *args, **kwargs):
self.user_object.status = Status.READY
self.user_object.save()
@app.task(base=ReadyTask, bind=True)
def do_stuff(self, *args, **kwargs):
self.user_object.do_stuff()