我最近一直在尝试想出一个好方法来等待任务的生产者由芹菜工人完成。我想出一种方法,但这似乎不够快,这里是:
芹菜生产商:
leafs = []
def chain_tasks():
for i in range(1, 10):
p1 = ping1.si(i)
p2 = ping2.si(i)
p3 = ping3.si(i)
p4 = ping4.si(i)
mychain = chain(p1, p2, p3, p4)
leaf_id = mychain.apply_async()
leafs.append(leaf_id)
print('[INFO] Total leafs ->', leafs)
def _cancel_tasks(msg):
print("[ERROR] Dummy Task canceller->", msg)
def parent_succeeds(t):
if t.parent == None:
return True
else:
parent_succeeded = True
parent = t.parent
if parent.state == 'PENDING':
parent_succeeded = parent_succeeds(parent)
if not parent_succeeded:
return False
print('[INFO] Waiting on parentTask({0})...at {1} - {2}'.format(parent, datetime.now().strftime("%H:%M:%S"), parent.state), end='')
parent.wait(propagate=True)
print('Done.')
return parent.state != 'FAILURE'
def wait_for_comp():
print("[INFO] Waiting for celery to finish...")
max_fail = round(len(leafs) / 2)
fail_count = 0
for t in leafs:
if fail_count <= max_fail:
print('[INFO] Waiting on Task({0})...at {1}'.format(t, datetime.now().strftime("%H:%M:%S")))
try:
if parent_succeeds(t):
t.wait(propagate=True)
else:
print('[ERROR] One of the parent failed -> ', t.parent)
except Exception as e:
fail_count += 1
print('[ERROR] Exception Occurred [' + datetime.now().strftime("%H:%M:%S") + '] ->', str(e), fail_count)
print('[ERROR] Traceback [' + datetime.now().strftime("%H:%M:%S") + '] ->', traceback.format_exc())
else:
print("[ERROR] Failed!")
_cancel_tasks('failure of more than half tasks({0}/{1})'.format(fail_count, max_fail))
break
print("[INFO] Done.")
if __name__ == '__main__':
time_start = time.time()
chain_tasks()
wait_for_comp()
print('Finish time %s', time.time() - time_start)
这种方法的一个问题是,它等待一系列(for-loop(任务,这些任务不一定需要在工作线程端维护,因为工作线程执行是基于rabbit-mq条目的。所以它涉及大量的等待。
有没有其他方法可以使等待更有效率?
如果我没有错过一些重要的东西,那么简单的解决方案是将leaf_id = mychain.apply_async()
更改为以下内容:
result_as = mychain.apply_async()
result = result_as.get() # will block until the task is done
注意:不要在任务中调用 get((。