我正在尝试获得完成Celery任务所需的估计时间。例如,如果任务A是由用户在前端触发的(可能是通过单击触发对后端的REST API调用的按钮(,它将显示一个计时器,以显示任务完成所需的时间。
如果我正确阅读了Celery的文档,我就可以获得预定任务的ETA。但是当前刚刚执行的任务呢?
我最初的想法是将以前发生的类似任务的执行时间存储在数据库中,并可能计算出所需的平均时间。如果有人能在上面的例子中分享一些经验,那就太好了。非常感谢!
方法1:使用定时装饰器
import time
import functools
def timer(func):
@functools.wraps(func)
def _wrapper(*args, **kwargs):
start = time.perf_counter()
val = func(*args, **kwargs)
end = time.perf_counter()
elapsed_time = end - start
task_info = {"time": elapsed_time, task_args: args, task_kwargs: kwargs}
# do something with this task_info, i.e log or save in db to be used for analysis later
return val
return _wrapper
@app.task(name="task1")
@timer
def task1(...):
...
方法2:使用芹菜信号
import time
from celery.signals import task_prerun, task_postrun
tasks = {}
@task_prerun.connect
def task_prerun_handler(task_id, task, **extras):
""" Dispatched before a task is executed. """
tasks[task_id] = time.perf_counter()
@task_postrun.connect
def task_postrun_handler(task_id, task, **extras):
""" Dispatched after a task has been executed. """
end = time.perf_counter()
elapsed_time = end - tasks.pop(task_id)
# do something with this task_info, i.e log or save in db to be used for analysis later
您可以查看的资源:
- https://docs.celeryproject.org/en/latest/userguide/signals.html
- https://docs.celeryproject.org/en/stable/userguide/monitoring.html