将参数传递给芹菜任务的on_failure方法



我正在创建一个自定义的 Celery 任务类,以覆盖任务达到最大重试次数 (on_failure时发生的情况。如果任务失败,我需要更新用户模型的状态。

以下是我的自定义任务类:

class ReadyTask(Task):
def run(self, user):
try:
user.get_results()
except Exception as exc:
raise self.retry(exc=exc, max_retries=3)
def on_failure(self, exc, task_id, *args, **kwargs):
user.status = Status.READY
user.save()

如何将 User 对象传递给on_failure()方法以更新其状态?

我相信您可以检查您的argskwargs以获取用户的 ID,如果您将其作为参数发送到任务中。如果你在kwargs中制作它,那就更容易了,这样你就不必做arg位置检查了。然后只是从 id 中获取您的用户并进行更改?

因此,不要将其发送到run函数中,而是作为参数/关键字参数发送到您正在调用的任务的函数,通过function.apply(kwargs)function.apply_async(kwargs=kwargs)function.delay(kwargs)

所以:

user_id = kwargs.get('user_id')

# then resolve to user object, then update object

您还可以将对象绑定到自定义任务类。通过使用bind=True.

class ReadyTask(Task):
def run(self, user):
self.user_object = user
try:
self.user_object.get_results()
except Exception as exc:
raise self.retry(exc=exc, max_retries=3)
def on_failure(self, exc, task_id, *args, **kwargs):
self.user_object.status = Status.READY 
self.user_object.save()
@app.task(base=ReadyTask, bind=True)
def do_stuff(self, *args, **kwargs):
self.user_object.do_stuff()

相关内容

  • 没有找到相关文章

最新更新