如果这只是我理解错误的结果,我深表歉意。我一直在四处搜索并阅读文档,但一直无法找到适合我的解决方案。
我有一个树结构,每个节点可以有任意数量的子节点。对于每个节点,都会实例化一个新的芹菜任务来创建它,然后它会查看所有子节点并实例化新任务来创建这些子节点。这样做的原因是为了更好地利用芹菜的多线程特性。递归创建整个树的单个任务似乎只利用了单个线程。
虽然我已经能够以这样的方式设置我的代码,但我的问题是,我在原始任务中有一些依赖项,在整个结构完成创建之前,这些依赖项无法执行。代码看起来像:
@app.task
def initial_task(tree_data):
jobs = []
for node in tree_data:
jobs.append(recursive_task.s(node))
job = group(jobs)
result = job.apply_async()
# Block execution until group is finished
while not result.ready():
time.sleep(0.5)
... do dependent stuff ...
@app.task
def recursive_task(node, parent=None):
# Create node object
node_obj = Node(node.name, parent=parent)
jobs = []
for child in node.children:
jobs.append(recursive_task.s(child, node_obj))
job = group(jobs)
result = job.apply_async()
return node_obj
我遇到的问题是,所有孩子的子任务都不会阻碍第一组任务的完成,我不知道如何强制执行。在这件事上如有任何帮助,我们将不胜感激。
因为我在创建子项时需要node_obj的ID,所以我不能简单地递归树并链接任务。
更新:我对代码做了一些更改,试图对结果进行更改。以下代码使所有子节点(包括孙、曾孙等)成为顶级节点的直接子节点:
@app.task
def initial_task(tree_data):
def _recursive_link_task(task_set, children):
for child in children:
task_set.link(create_node.s(child))
if child.children:
_recursive_link_task(task_set, child.children)
for node in tree_data:
s = create_node.s(None, node)
if node.children:
_recursive_link_task(s, node.children)
s.apply_async()
@app.task
def create_node(parent, node):
node_obj = Node(node.name, parent=parent)
return (node_obj,)
我本以为我可能会在上面的代码片段中获得更多的财富,但由于它只是传递给所有后续任务的初始节点对象,我仍然没有进一步尝试生成这个树结构。
使用和弦执行依赖于一系列任务结果的任务。
由于我无法完全理解您需要如何调用递归任务,所以我实现了mergesort的参考示例。
请注意这将不适用于芹菜3.2.0+,因为在任务中调用get
将导致异常。
from celery import Celery, chord
app = Celery('tasks', backend='amqp', broker='amqp://')
app.conf.CELERY_RESULT_BACKEND = 'amqp'
def mergesort(list_obj):
'''normal mergesort
'''
if len(list_obj) <= 1:
return list_obj
middle = len(list_obj) / 2
left, right = list_obj[:middle], list_obj[middle:]
return list(merge(list(mergesort(left)), list(mergesort(right))))
def merge(left, right):
'''normal merge
'''
while 1:
if left == []:
for j in right:
yield j
break
elif right == []:
for j in left:
yield j
break
elif left[0] < right[0]:
yield left.pop(0)
else:
yield right.pop(0)
def merge2(left_r, right_r):
'''celery merge
'''
left =left_r.get()
right = right_r.get()
while 1:
if left == []:
for j in right:
yield j
break
elif right == []:
for j in left:
yield j
break
elif left[0] < right[0]:
yield left.pop(0)
else:
yield right.pop(0)
@app.task
def merge_c(in_list):
'''celery merge
'''
#unpack
print '*'*21 + str( in_list)
left, right = in_list
return list(merge2(left, right))
@app.task
def same_object(l_obj):
'''helper function to convert list to `result`
'''
return l_obj
@app.task
def mergesort_c(list_obj):
'''celery mergesort
'''
if len(list_obj) <= 1:
# make sure that you return a `result` object for merge
return same_object.delay(list_obj)
middle = len(list_obj) / 2
left, right = list_obj[:middle], list_obj[middle:]
# finish mergesort (left) and mergesort(right) and merge them
res = chord([mergesort_c.s(left), mergesort_c.s(right)])(merge_c.s())
return res
if __name__ == '__main__':
l = [2,1, 3]
#normal mergesort
print mergesort(l) #[1, 2, 3, 3, 5]
# with celery
res = mergesort_c(l)
print res.get()