我们的工作流程目前是围绕旧版本的芹菜构建的,所以请记住,事情已经不是最佳的。我们需要运行一个任务并将该任务运行的记录保存在数据库中。如果该任务失败或挂起(经常发生(,我们希望重新运行,就像第一次运行时一样。不过,这不应该自动发生。它需要根据故障的性质手动触发,并且需要将结果记录在数据库中以做出该决定(通过前端(。
我们如何在数据库中保存任务的完整记录,以便后续进程可以抓取该记录并运行新的相同任务?当前实现将数据库中@task
修饰函数的路径保存为TaskInfo
模型的一部分。当任务需要重新运行时,我们在TaskInfo
模型上有一个get_task()
方法,它从数据库中获取路径,使用getattr
导入它,以及另一个使用*args, **kwargs
再次运行任务的rerun()
方法(也保存在数据库中(。
像这样(这些是TaskInfo
模型实例上的方法(:
def get_task(self):
"""Returns the task's decorated function, which can be delayed."""
module_name, object_name = self.path.rsplit('.', 1)
module = import_module(module_name)
task = getattr(module, object_name)
if inspect.isclass(task):
task = task()
# task = current_app.tasks[self.path]
return task
def rerun(self):
"""Re-run the task, and replace this one.
- A new task is scheduled to run.
- The new task's TaskInfo has the same parent as this TaskInfo.
- This TaskInfo is deleted.
"""
args, kwargs = self.get_arguments()
celery_task = self.get_task()
celery_task.delay(*args, **kwargs)
defaults = {
'path': self.path,
'status': Status.PENDING,
'timestamp': timezone.now(),
'args': args,
'kwargs': kwargs,
'parent': self.parent,
}
TaskInfo.objects.update_or_create(task_id=celery_task.id, defaults=defaults)
self.delete()
必须有一个更干净的解决方案来将任务保存在数据库中以便以后重新运行,对吗?
最新版本的Celery (4.4.0( 包含一个参数extended_result
。您可以将其设置为 True,然后Result Backend Database
中的表(默认情况下名为celery_taskmeta
(将存储任务的args and kwargs
。
这是一个演示:
app = Celery('test_result_backend')
app.conf.update(
broker_url='redis://localhost:6379/10',
result_backend='db+mysql://root:passwd@localhost/celery_toys',
result_extended=True
)
@app.task(bind=True, name='add')
def add(self, x, y):
self.request.task_name = 'add' # For saving the task name.
time.sleep(5)
return x + y
通过MySQL中记录的任务信息,您可以轻松重新运行任务。