跟踪芹菜的生长过程.小组的任务


@celery.task
def my_task(my_object):
    do_something_to_my_object(my_object)

#in the code somewhere 
tasks = celery.group([my_task.s(obj) for obj in MyModel.objects.all()])
group_task = tasks.apply_async()

问题:芹菜是否有一些东西可以检测组任务的进度?我能知道有多少任务在那里,有多少任务已经处理了吗?

在shell (ippython的tab自动完成)上进行修补,我发现group_task(这是一个celery.result.ResultSet对象)有一个名为completed_count的方法,它正好提供了我所需要的。

还在http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.ResultSet.completed_count

找到了文档

以下是基于@dalore的答案的完整工作示例。

第一个tasks.py .

import time
from celery import Celery, group
app = Celery('tasks', broker='pyamqp://guest@127.0.0.1//', backend='redis://localhost')
@app.task(trail=True)
def add(x, y):
    time.sleep(1)
    return x + y
@app.task(trail=True)
def group_add(l1, l2):
    return group(add.s(x1, x2) for x1, x2 in zip(l1, l2))()

使用Docker启动redis server: docker run --name my-redis -p 6379:6379 -d redis .

使用Docker启动RabbitMQ: docker run -d --hostname my-rabbit --name my-rabbit -p 5672:5672 rabbitmq:alpine .

在一个单独的shell中启动一个进程celery worker: celery -A tasks worker --loglevel=info -c 1

然后运行下面的测试脚本

from tasks import group_add
from tqdm import tqdm
total = 10
l1 = range(total)
l2 = range(total)
delayed_results = group_add.delay(l1, l2)
delayed_results.get()  # Wait for parent task to be ready.
results = []
for result in tqdm(delayed_results.children[0], total=total):
    results.append(result.get())
print(results)

你应该看到类似下面的内容,进度条每秒增加10%

50%|#####     | 5/10 [00:05<00:05,  1.01s/it
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

最后,清理你的redis和rabbitmq容器。

docker stop my-rabbit my-redis
docker rm my-rabbit my-redis

阅读AsyncResult的文档,有一个collect方法可以在结果进来时收集结果。

http://docs.celeryproject.org/en/latest/reference/celery.result.html celery.result.AsyncResult.collect

from celery import group
from proj.celery import app
@app.task(trail=True)
def A(how_many):
    return group(B.s(i) for i in range(how_many))()
@app.task(trail=True)
def B(i):
    return pow2.delay(i)
@app.task(trail=True)
def pow2(i):
    return i ** 2

示例输出:

>>> from celery.result import ResultBase
>>> from proj.tasks import A
>>> result = A.delay(10)
>>> [v for v in result.collect()
...  if not isinstance(v, (ResultBase, tuple))]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

注意:必须启用Task.trail选项,以便将子节点列表存储在result.children中。这是默认的,但是为了说明而显式启用。

编辑:

经过进一步的测试,我们发现虽然collect状态将收集结果,但它仍然等待。我发现,要取得进步,你需要得到孩子们的结果,就像这样:

group_result = mygrouptask.delay().get()
for result in tqdm(group_result.children, total=count):
    yield result.get()

tqdm在控制台显示进度

mygrouptask返回一个芹菜组,如下所示:

return group(mytask.s(arg) for arg in args)()

相关内容

  • 没有找到相关文章

最新更新