我需要向带有一些参数的 API 发出请求,并逐个处理请求中的数据。但是有时响应数据是分页的,这意味着我需要使用相同的参数发出额外的请求。 芹菜group
允许我们一个接一个地运行
任务,但如果任务产生子任务,子任务可以产生更多的子任务......,有没有办法等待所有子任务完成,然后再运行组中的下一个任务?或者也许芹菜给了我们更好的方法来解决我的任务?
def some_api_call(start_date=None, end_date=None, token_value=None):
pass
@celery_app.task(name='run_task', max_retries=None, bind=True)
def run_task(self):
group_items = [
task1.s('2020-01-02', '2020-01-03'),
task1.s('2020-01-05', '2020-01-01'),
task1.s('2020-01-010', '2020-01-04'),
]
group(group_items)()
@celery_app.task(name='task1', max_retries=None, bind=True)
def task1(self, start_date=None, end_date=None, token_value=None, *args, **kwargs):
res = some_api_call(start_date, end_date, token_value)
if res['token_value']:
# NEXT ELEMENT IN THE GROUP SHOULD WAIT UNTIL NESTED CHILD TASKS DONE
task1.delay(token_value=token_value)
经纪人 -Redis
. 我可能的解决方案[伪代码]:
- 等到父任务中的子任务完成
res = task1.delay(token_value=token_value)
res.get()
解决方案不好 - 我们阻止胎面。 不确定芹菜是否等待替代品。
- 用户
task.retry()
检查子任务是否已完成。
taskid = task1.delay(token_value=token_value)
if AsyncResult(taskid).state != "successes":
self.retry()
因此,我们将重试父任务,直到子任务完成并且不会阻塞线程。 但就像在第一个解决方案中一样:父任务处理了它的数据,但状态将是重试。
如果需要等待组中的任务完成,则应使用 Chord 原语。此外,如果您需要按顺序执行任务(一个接一个(,请使用 Chain 原语。和弦基本上是一个集团链,也是最后的任务......