我怎么能等到所有芹菜任务都完成才继续我的代码



我有一个大的csv文件,我将其拆分为一个由每个块组成的列表,每个块包含100000行,将每个块传递给函数进行复杂的计算,并将结果附加到global_list中。当最后一个块完成后,我拿起global_list并做一些统计。我如何要求芹菜并行处理所有块,但等到最后一个任务/最后一个块完成后再执行global_list上的函数complex_calc?

谢谢你的帮助

for chunk in global_chunk_list:
   def func_calc.delay(chunk) #<<<<< use celery tasks
complex_calc(global_list) #<<<<< should only start when processing last chunk is finished 
@celery.task(name='func_calc')     
def func_calc(chunk):
  ...
  #save chunk in a global list
  global_list.append(result)
def complex_calc(global_list):
 ...

适当的方法是使用 Group s 和 join 方法等待一组并行任务完成执行。

task_group = group([func_calc.s(chunk) for chunk in global_chunk_list])
result_group = task_group.apply_async()
results = result_group.join()  # wait for all results

另请参阅文档中的示例。(一个区别是使用 join 而不是等待任务完成的 get(另请参阅此答案。

>>> from celery import group
>>> from tasks import add
>>> job = group([
...             add.s(2, 2),
...             add.s(4, 4),
...             add.s(8, 8),
...             add.s(16, 16),
...             add.s(32, 32),
... ])
>>> result = job.apply_async()
>>> result.ready()  # have all subtasks completed?
True
>>> result.successful() # were all subtasks successful?
True
>>> result.get()
[4, 8, 16, 32, 64]

若要有效地执行此操作,需要配置结果后端。

相关内容

  • 没有找到相关文章

最新更新