我会将几个作业添加到芹菜队列中,然后等待结果。关于如何使用某种类型的共享存储(memcached、redis、数据库等)来实现这一点,我有很多想法,但我认为这是Celery可以自动处理的,但我在网上找不到任何资源。
代码示例
def do_tasks(b):
for a in b:
c.delay(a)
return c.all_results_some_how()
对于Celery>=3.0,不赞成使用TaskSet来支持group。
from celery import group
from tasks import add
job = group([
add.s(2, 2),
add.s(4, 4),
add.s(8, 8),
add.s(16, 16),
add.s(32, 32),
])
在后台启动组:
result = job.apply_async()
等待:
result.join()
Task.delay
返回AsyncResult
。使用AsyncResult.get
获取每个任务的结果。
要做到这一点,您需要保留对任务的引用。
def do_tasks(b):
tasks = []
for a in b:
tasks.append(c.delay(a))
return [t.get() for t in tasks]
或者您可以使用ResultSet
:
更新:ResultSet
已弃用,请参阅@laffuste的回答。
def do_tasks(b):
rs = ResultSet([])
for a in b:
rs.add(c.delay(a))
return rs.get()
我有一种预感,你不是真的想要延迟,而是想要Celery的异步功能。
我想你真的想要一个任务集:
from celery.task.sets import TaskSet
from someapp.tasks import sometask
def do_tasks(b):
job = TaskSet([sometask.subtask((a,)) for a in b])
result = job.apply_async()
# might want to handle result.successful() == False
return result.join()