定义气流DAG任务排序的正确方式



我在DAG中有一组相当长的任务,每个任务都有一个相当长的task_id,细节都是相关的,命名不能缩短
目前我写的是:

a_very_long_long_named_task_1 >> a_very_long_long_named_task_2 >> a_very_long_long_named_task_3 >> a_very_long_long_named_task_4 >> a_very_long_long_named_task_5

在其他DAG中,我看到它被拆分为多行,尽管有重复:

a_very_long_long_named_task_1 >> a_very_long_long_named_task_2
a_very_long_long_named_task_2 >> a_very_long_long_named_task_3
a_very_long_long_named_task_3 >> a_very_long_long_named_task_4
a_very_long_long_named_task_4 >> a_very_long_long_named_task_5

推荐哪一种?是否有最佳实践,或者可能有其他更好的方法来定义任务排序?

  • 在创建(实例化(任务时,您可以继续将任务添加到pythonlist(或dict/类似的东西(
  • 最后,您可以以编程方式将它们连接起来

注意,该片段未经测试

from typing import List
from airflow.models.baseoperator import BaseOperator
my_tasks: List[BaseOperator] = [
a_very_long_long_named_task_1,
a_very_long_long_named_task_2,
a_very_long_long_named_task_3, 
a_very_long_long_named_task_4, 
a_very_long_long_named_task_5
]
..
# define a utility method to set dependencies b/w tasks
def wire_tasks(my_tasks: List[BaseOperator]) -> None:
"""
A utility method that accepts a list of tasks and links them up
:param my_tasks: List of tasks (operator instances)
:type my_tasks: List[BaseOperator]
:return None
"""
for i in range(1, len(my_tasks)):
# this is equivalent to my_tasks[i - 1].set_upstream(my_tasks[i])
my_tasks[i - 1] >> my_tasks[i]
# call the utility method to wire the tasks
wire_tasks(my_tasks=my_tasks)

最新更新