Celery-一个任务运行N个任务,等待它们并处理结果



我需要以下工作流程:

  • ParentTask首先运行
  • 在某个时刻,它生成了N个并行运行的ChildTask实例
  • ParentTask等待这些结果完成,收集结果,以某种方式进行处理并完成

这似乎很容易。不幸的是,从任务中调用Task().delay()(我用它来调用任务(似乎被完全忽略了。我在这里完全迷路了。

如果您更喜欢代码方法,我也会将其包括在内。

from celery.task import Task
from celery.result import AsyncResult
class ParentTask(Task):
def run(self, *args, **kwargs):
# do some stuff
ids = [ChildTask().delay().id for _ in range(N)] # this seems to do nothing here
results = [AsyncResult(t) for t in ids]
while not all([r.ready() for r in results]): # wait for child tasks to finish
sleep(.100)
# do some stuff again
# return results
class ChildTask(Task):
def run(self, *args, **kwargs):
# do some child stuff
# return child results
ParentTask().delay() # this delay works fine

感谢提供任何线索!

好的,我明白了。工作方法可以是这样的(当然,任务可以做任何需要的事情(:

from time import sleep
from celery.task import Task
from celery import chain, group
class PreTask(Task):
def run(self, *args, **kwargs):
x = 0
for i in range(100000):
x += 1
return x

class MidTask(Task):
def run(self, *args, **kwargs):
sleep(5)
return 42

class PostTask(Task):
def run(self, *args, **kwargs):
return args

# call it like this
res = chain(PreTask().s() | group(MidTask().s() for _ in range(5)) | PostTask().s()).apply_async()
# and get the result for example like this
while(True):
if res.ready():
print(res.get())
sleep(1)

希望它能帮助到别人。

相关内容

  • 没有找到相关文章

最新更新