在所有任务完成后运行任务



我正在编写一个应用程序,该应用程序需要并行运行一系列任务,然后使用所有任务的结果运行单个任务:

@celery.task
def power(value, expo):
    return value ** expo
@celery.task
def amass(values):
    print str(values)

这是一个非常做作和过于简化的例子,但希望你能很好地理解这一点。基本上,我有许多需要运行power项,但我只想在所有任务的结果上运行amass。所有这些都应该异步发生,我不需要从amass方法返回任何东西。

有没有人知道如何在芹菜中设置这一点,以便一切都异步执行,并且在所有说过和做过之后调用具有结果列表的单个回调?

我已经按照Alexander Afanasiev的建议设置了这个示例,以便使用chord运行:

from time import sleep
import random
tasks = []
for i in xrange(10):
    tasks.append(power.s((i, 2)))
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
callback = amass.s()
r = chord(tasks)(callback)

不幸的是,在上面的例子中,tasks中的所有任务只有在调用chord方法时才会启动。是否有一种方法,每个任务可以单独启动,然后我可以添加回调组运行时,一切都完成了?

这是一个适合我的解决方案:

tasks.py :

from time import sleep
import random
@celery.task
def power(value, expo):
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
    return value ** expo
@celery.task
def amass(results, tasks):
    completed_tasks = []
    for task in tasks:
        if task.ready():
            completed_tasks.append(task)
            results.append(task.get())
    # remove completed tasks
    tasks = list(set(tasks) - set(completed_tasks))
    if len(tasks) > 0:
        # resend the task to execute at least 1 second from now
        amass.delay(results, tasks, countdown=1)
    else:
        # we done
        print results
用例:

tasks = []
for i in xrange(10):
    tasks.append(power.delay(i, 2))
amass.delay([], tasks)

这个应该做的是尽快异步启动所有任务。一旦它们都被发布到队列中,amass任务也将被发布到队列中。在所有其他任务完成之前,群集任务将继续重新发布自己。

芹菜有足够的工具为大多数工作流程,你可以想象。

看来你需要使用和弦。以下是来自docs的引用:

一个和弦就像一个组,但有一个回调。和弦由一个标题组和一个主体,其中主体是一个任务,应该在头文件中的所有任务完成后执行。

看一下你的问题中的这个片段,看起来你正在传递一个list作为和弦头,而不是一个group:

from time import sleep
import random
tasks = []
for i in xrange(10):
    tasks.append(power.s((i, 2)))
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
callback = amass.s()
r = chord(tasks)(callback)

list转换为group应该会导致您期望的行为:

...
callback = amass.s()
tasks = group(tasks)
r = chord(tasks)(callback)

@alexander-afanasiev给你的答案基本上是正确的:使用和弦。

你的代码是好的,但tasks.append(power.s((i, 2)))实际上并没有执行子任务,只是将子任务添加到列表中。它是chord(...)(...),它向代理发送与您在tasks列表中定义的子任务一样多的消息,再加上一个回调子任务的消息。当你调用chord时,它会尽快返回。

如果您想知道和弦何时完成,您可以轮询完成,就像在示例中使用r.ready()进行单个任务一样。

相关内容

  • 没有找到相关文章

最新更新