我设法找到了 2 个类似的主题,并讨论了这个问题,但不幸的是我无法从中得到最好的解决方案:
- 根据芹菜任务状态更新 Django 模型字段
- 根据芹菜任务状态更新 Django 模型字段
我使用 Django 和 Celery (+redis 作为消息代理(,我想在芹菜任务状态更改(从挂起 -> 成功、挂起 -> 失败(等时更新 django 模型。
我的代码:
import time
from celery import shared_task
@shared_task(name="run_simulation")
def run_simulation(simulation_id: str):
t1_start = time.perf_counter()
doSomeWork() # we may change this to sleep for instance
t1_end = time.perf_counter()
return{'process_time': t1_end - t1_start}
以及我调用任务的特定视图:
def run_simulation(request):
form = SimulationForm(request.POST)
if form.is_valid():
new_simulation = form.save()
new_simulation.save()
task_id = tasks.run_simulation.delay(new_simulation.id)
问题是,当任务状态发生更改时,更新模拟的 django 模型状态的首选方法是什么?
在文档中,我发现了使用方法on_failure
、on_success
等方法的处理程序 http://docs.celeryproject.org/en/latest/userguide/tasks.html#handlers
我认为没有首选方法来做这样的事情,因为它取决于您的项目。 您可以使用监视任务,例如您发送的链接。为任务指定任务 ID 并重新计划任务,直到受监视任务处于 FINISHED 状态。
from celery import AsyncResult
@app.task(bind=True)
def monitor_task(self, t_id):
"""Monitor a task"""
res = AsyncResult(t_id, backend=self.backend, app=self.app)
if res.ready():
raise self.retry(
countdown=10,
exc=Exception("Main task not done yet.")
)
您还可以创建事件接收器并检查任务的状态,然后将其保存在数据库中。 http://docs.celeryproject.org/en/latest/userguide/monitoring.html#real-time-processing
现在,如果您只对成功和失败状态感兴趣,则可以创建成功和失败回调,并注意将成功或失败状态保存在数据库中。
tasks.run_simulation.apply_async(
(sim_id,),
link=tasks.success_handler.s(),
link_error=tasks.error_handler()
)
http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks