我有一个任务task_main
,该任务调用其他任务。但是我需要他们以特定的顺序执行。
芹菜文档说不是用.delay()
和get()
。
http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronic-subtasks
使用链会运行它们吗?我在文档中找不到这一点。
@shared_task
def task_a():
pass
@shared_task
def task_b():
pass
@shared_task
def task_b():
pass
@shared_task
def task_main():
chain = task_a.s() | task_b.s() | task_c.s()
chain()
是的,如果您使用链条任务将接一个地运行。这是正确的文档:http://docs.celeryproject.org/en/latest/userguide/canvas.html#chains
也许是python数据科学ETL管道之后的更具体的示例,基本上,我们从db中提取数据,然后将数据转换为预期的方式,然后将数据加载到结果后端:
@app.task(base=TaskWithDBClient, ignore_result=True)
def extract_task(user_id):
"""Extract data from db w.r.t user."""
data = # some db operations ...
return data
@app.task()
def transform_task(data):
"""Transform input into expected form."""
data = .... # some code
# the data will be stored in result backend
# because we didn't ignore result.
return data
@app.task(ignore_result=True)
def etl(user_id):
"""Extract, transform and load."""
ch = chain(extract_task.s(user_id),
transform_task.s())()
return ch
回到您的主要应用程序,您只需要致电:
etl.delay(user_id)
任务将被顺序执行。