气流:如何设置对多个任务运行的依赖关系



我正在设置一个DAG,它由三个主要任务组成:

Extraction(today)     >> Fitting(today)
Extraction(yesterday) >> Fitting(today)
Fitting(yesterday)    >> Eval(today)
Extraction(today)     >> Eval(today)

因此,我想用今天和昨天的数据提取来喂养今天的模型,并希望根据今天的数据提取来评估昨天的模型。

我可以将其中的每一个都写为不同的任务,但如果昨天已经完成,我想避免运行昨天的数据提取。模型拟合也是如此,因此我想在 DAG 中引用与当前 DAG 具有不同execution_date的任务。将execution_date作为参数传递似乎不是解决方案,实际上会导致

Broken DAG: [/var/lib/airflow/dags/test_dag.py] Dependency <Task(DummyOperator): fitting>, extraction already registered

因为 airflow 认为我将相同的extraction任务作为fit任务的依赖项分配两次。

这是我的代码:

fit = DummyOperator(task_id='fitting',
depends_on_past=True,
dag=dag)
fit >> dag
fit.set_upstream([DummyOperator(task_id='extraction',
depends_on_past=False,
dag=dag,
execution_date=datetime(2018, 5, 18)
),
DummyOperator(task_id='extraction',
depends_on_past=False,
dag=dag,
execution_date=datetime(2018, 5, 17)
)
])

知道如何实现这一目标吗?

您遇到的错误是因为task_id应该是唯一的,并且您已经使用了两次相同的task_id='extraction'

您可以拥有如下代码:

fit = DummyOperator(task_id='fitting',
dag=dag)
extract_yday = DummyOperator(task_id='extract_yday',
dag=dag)
extract_today = DummyOperator(task_id='extract_today',
dag=dag)
extract_yday >> fit
extract_today >> fit

此外,您还可以使用气流宏来表示今天和昨天的日期:{{ ds }}{{ yesterday_ds }}将其传递给操作员。

最新更新