用tqdm跟踪芹菜和弦任务的进展?(Python)



有没有办法跟踪和弦的进度,最好是在tqdm小节中?

例如,如果我们以文档为例,我们将创建以下文件:

#proj/tasks.py
@app.task
def add(x, y):
return x + y
@app.task
def tsum(numbers):
return sum(numbers)

然后运行这个脚本:

from celery import chord
from proj.tasks import add, tsum
chord(add.s(i, i)
for i in range(100))(tsum.s()).get()

我们如何追踪和弦的进展?

  • 我们不能使用update_state,因为chord((对象不是函数
  • 我们不能使用collect((,因为在结果准备好之前,chord(((回调(会阻塞脚本

理想情况下,我会为Dask设想这样的自定义tqdm子类,但我一直无法找到类似的解决方案。

非常感谢任何帮助或提示!

所以我找到了绕过它的方法。

首先,chord(((回调(实际上并没有阻塞脚本,只有.get((部分阻塞。将所有任务发布到代理可能需要很长时间。幸运的是,有一种简单的方法可以通过信号跟踪发布过程。我们可以在发布开始前创建一个进度条,并从文档中修改示例处理程序以更新它:

from tqdm import tqdm
from celery.signals import after_task_publish
publish_pbar = tqdm(total=100, desc="Publishing tasks")
@after_task_publish.connect(sender='tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
publish_pbar.update(1)
c = chord(add.s(i, i)
for i in range(100))(tsum.s())
# The script will resume once all tasks are published so close the pbar
publish_pbar.close()

然而,这仅适用于发布任务,因为此信号是在发送任务的信号中执行的。task_success信号是在工作进程中执行的,所以这个技巧只能在工作日志中使用(据我所知(。

因此,为了在所有任务发布并恢复脚本后跟踪进度,我转向app.control.inspect((.stats((中的worker统计数据。这将返回一个包含各种统计数据的dict,其中包括已完成的任务。这是我的实现:

tasks_pbar = tqdm(total=100, desc="Executing tasks")
previous_total = 0
current_total = 0
while current_total<100:
current_total = 0
for key in app.control.inspect().stats():
current_total += app.control.inspect().stats()[key]['total']['tasks.add']
if current_total > previous_total:
tasks_pbar.update(current_total-previous_total)
previous_total = current_total
results = c.get()
tasks_pbar.close()

最后,我认为可能有必要为任务命名,既用于信号处理程序的过滤,也用于stat((dict,所以不要忘记将其添加到您的任务中:

#proj/tasks.py
@app.task(name='tasks.add')
def add(x, y):
return x + y

如果有人能找到更好的解决方案,请分享!

相关内容

  • 没有找到相关文章

最新更新