我正在编写一个应用程序,该应用程序需要并行运行一系列任务,然后使用所有任务的结果运行单个任务:
@celery.task
def power(value, expo):
return value ** expo
@celery.task
def amass(values):
print str(values)
这是一个非常做作和过于简化的例子,但希望你能很好地理解这一点。基本上,我有许多需要运行power
的项,但我只想在所有任务的结果上运行amass
。所有这些都应该异步发生,我不需要从amass
方法返回任何东西。
有没有人知道如何在芹菜中设置这一点,以便一切都异步执行,并且在所有说过和做过之后调用具有结果列表的单个回调?
我已经按照Alexander Afanasiev的建议设置了这个示例,以便使用chord
运行:
from time import sleep
import random
tasks = []
for i in xrange(10):
tasks.append(power.s((i, 2)))
sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
callback = amass.s()
r = chord(tasks)(callback)
不幸的是,在上面的例子中,tasks
中的所有任务只有在调用chord
方法时才会启动。是否有一种方法,每个任务可以单独启动,然后我可以添加回调组运行时,一切都完成了?
这是一个适合我的解决方案:
tasks.py :
from time import sleep
import random
@celery.task
def power(value, expo):
sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
return value ** expo
@celery.task
def amass(results, tasks):
completed_tasks = []
for task in tasks:
if task.ready():
completed_tasks.append(task)
results.append(task.get())
# remove completed tasks
tasks = list(set(tasks) - set(completed_tasks))
if len(tasks) > 0:
# resend the task to execute at least 1 second from now
amass.delay(results, tasks, countdown=1)
else:
# we done
print results
用例:tasks = []
for i in xrange(10):
tasks.append(power.delay(i, 2))
amass.delay([], tasks)
这个应该做的是尽快异步启动所有任务。一旦它们都被发布到队列中,amass
任务也将被发布到队列中。在所有其他任务完成之前,群集任务将继续重新发布自己。
芹菜有足够的工具为大多数工作流程,你可以想象。
看来你需要使用和弦。以下是来自docs的引用:
一个和弦就像一个组,但有一个回调。和弦由一个标题组和一个主体,其中主体是一个任务,应该在头文件中的所有任务完成后执行。
看一下你的问题中的这个片段,看起来你正在传递一个list
作为和弦头,而不是一个group
:
from time import sleep
import random
tasks = []
for i in xrange(10):
tasks.append(power.s((i, 2)))
sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
callback = amass.s()
r = chord(tasks)(callback)
将list
转换为group
应该会导致您期望的行为:
...
callback = amass.s()
tasks = group(tasks)
r = chord(tasks)(callback)
@alexander-afanasiev给你的答案基本上是正确的:使用和弦。
你的代码是好的,但tasks.append(power.s((i, 2)))
实际上并没有执行子任务,只是将子任务添加到列表中。它是chord(...)(...)
,它向代理发送与您在tasks
列表中定义的子任务一样多的消息,再加上一个回调子任务的消息。当你调用chord
时,它会尽快返回。
如果您想知道和弦何时完成,您可以轮询完成,就像在示例中使用r.ready()
进行单个任务一样。