如果另一个dag完成,则在dag中运行下一个任务



dag1:

start >> clean >> end

我有一个dag,用来执行一些任务。但我想对其进行修改,使得CCD_;dag2";目前没有运行。

有没有任何方法可以导入关于我的";dag2";,检查它的状态,如果它处于成功模式,我可以继续执行clean步骤类似这样的东西:

start >> wait_for_dag2 >> clean >> end

如何实现wait_for_dag2部分?

根据你想做什么,有一些不同的答案:

  • 如果您有两个具有相同计划间隔的dag,并且您想使第二个dag的运行等待第一个dag相同的运行,则可以在第一个dag的最后一个任务上使用ExternalTaskSensor

  • 如果你想运行dag2,在每次运行dag1之后,即使它是手动触发的,在这种情况下,你需要更新dag1并添加一个TriggerDagRunOperator,并将秒的调度间隔设置为None

  • 我想修改它,使得只有当另一个dag";dag2";目前没有运行。

    如果您有两个dag,并且您不想同时运行它们以避免在外部服务器/服务上发生冲突,您可以使用前两个命题之一,或者只对第一个dag的任务使用更高的优先级,并对导致冲突的任务使用相同的池(带1个插槽),但您将失去这些任务的并行性。

Hossein的方法是人们通常的做法。然而,如果你想获得任何dag运行数据的信息,你可以使用airfow功能来获得这些信息。当你不想(或不被允许)修改另一个dag时,以下方法是好的:

from airflow.models.dagrun import DagRun
from airflow.utils.state import DagRunState
dag_runs = DagRun.find(dag_id='the_dag_id_you_want_to_check')
last_run = dag_runs[-1]
if last_run.state == DagRunState.SUCCESS:
print('the dag run was successfull!')
else:
print('the dag state is -->: ', last_run.state)

最新更新