气流-有条件的任务执行



我在Ubuntu 20.04中运行Airflow 2.1.0,我在dag中检测到了优化的可能性,我需要帮助找出如何改进它。提前谢谢。

我正在运行dag来执行两种任务:

  1. 原始层=查询S3存储桶中的数据库和记录文件
  2. 可信层=从S3原始存储桶中读取,并记录在S3可信层存储桶中

我需要允许每个任务组的受信任任务在原始任务结束后立即执行,但当这种情况发生时,我还需要触发下一个任务组的原始任务。因为由于受信任层不会使用数据库,我可以通过允许它已经启动下一个Raw任务来优化dag执行时间,始终保持数据库繁忙我也不能同时运行查询,所以一次只能运行一个Raw任务。

实际流量:(未优化,原始任务空闲,但它们可能已经启动(

  1. 任务1=原始启动
  2. 任务1=原始结束
  3. 任务2=受信任的启动
  4. 任务2=受信任的结束

预期流量:(已优化,确保原始任务始终可用(

  1. 任务1=原始启动
  2. 任务1=原始结束
  3. (任务1=受信任的启动(+(任务2=原始启动(
  4. 任务1=受信任的结束
  5. 任务2=原始结束
  6. (任务2=受信任的启动(+(任务3=原始启动(

这是下面的dag代码:

from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from airflow_pentaho.operators.KitchenOperator import KitchenOperator
from airflow_pentaho.operators.PanOperator import PanOperator
from airflow_pentaho.operators.CarteJobOperator import CarteJobOperator
from airflow_pentaho.operators.CarteTransOperator import CarteTransOperator
from airflow.utils.task_group import TaskGroup
local_tz = pendulum.timezone('America/Sao_Paulo')
DAG_NAME = "Example_Dag"
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': pendulum.datetime(2022, 4, 13, tz=local_tz),
'email': 'example@gmail.com',
'retries': 3,
'retry_delay': timedelta(minutes=1)
}
with DAG(dag_id=DAG_NAME,
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=1),
catchup=False,
concurrency=2,
max_active_runs=1,
schedule_interval='*/10 * * * *') as dag:
with TaskGroup(group_id='table1') as table1:
job1 = CarteJobOperator(
dag=dag,
task_id="tsk_raw1",
job="Example/raw_layer")
job2 = CarteJobOperator(
dag=dag,
task_id="tsk_trusted1",
job="Example/trusted_layer")
job1 >> job2
with TaskGroup(group_id='table2') as table2:
job3 = CarteJobOperator(
dag=dag,
task_id="tsk_raw2",
job="Example/raw_layer")
job4 = CarteJobOperator(
dag=dag,
task_id="tsk_trusted2",
job="Example/trusted_layer")
job3 >> job4
with TaskGroup(group_id='table3') as table3:
job5 = CarteJobOperator(
dag=dag,
task_id="tsk_raw3",
job="Example/raw_layer")
job6 = CarteJobOperator(
dag=dag,
task_id="tsk_trusted3",
job="Example/trusted_layer")
job5 >> job6
with TaskGroup(group_id='table4') as table4:
job7 = CarteJobOperator(
dag=dag,
task_id="tsk_raw4",
job="Example/raw_layer")
job8 = CarteJobOperator(
dag=dag,
task_id="tsk_trusted4",
job="Example/trusted_layer")
job7 >> job8
with TaskGroup(group_id='table5') as table5:
job9 = CarteJobOperator(
dag=dag,
task_id="tsk_raw5",
job="Example/raw_layer")
job10 = CarteJobOperator(
dag=dag,
task_id="tsk_trusted5",
job="Example/trusted_layer")
job9 >> job10

table1 >> table2 >> table3 >> table4 >> table5

您的任务组正在阻碍您的最佳流程。我建议您单独列出所有任务,然后只使用>>运算符来显示实际的依赖关系。如果在生成的图上,有一组任务作为任务组是有意义的,那么只有这样,才应该将其正式添加为任务组。

最新更新