复杂气流跨dag依赖性



在我的气流中有两种类型的dag:

  • 第1 DAG类型(DT1) -将数据从数据源加载到数据湖。这些dag由时间表触发。
  • 第二DAG类型(DT2) -获取数据数据湖并进行一些转换聚合等。这些dag也可以通过调度触发,但只有当所有所需的DT1都处于"成功"状态
  • 时才可以触发。

实现下一个逻辑的正确方法是什么?

每小时触发3次dt1。每天触发一次DT2(~晚上10点或11点)。但DT2只有在过去一小时内三个dt1的状态都为"成功"时才能触发。

我宁愿只实现气流功能。

ExternalTaskSensor就是为此而设计的。

from airflow.sensors.external_task import ExternalTaskSensor

文档页面在这里,它显示了它的用法(指定执行日期,成功状态等)。

您将为此使用外部任务传感器,但主要捕获将是您如何使用这些标记之间的时间计划,因为这些是主要成分,此外,如果我们想要动态跨DAG依赖关系,其中一个DAG将开始另一个是每10分钟或每小时执行一次,我们可以使用execution_delta

execution_delta的关键点1."Dag_B schedule_interval"应该早于"Dag_A"。否则它将不起作用。2.在执行timedelta对象时,我们可以定义小时和分钟(这是Dag_B和Dag_A之间的差异)计划时间间隔)3.与上一次执行的时间差为看,默认值是与当前任务相同的execution_date。对于昨天,用[肯定的!]= 1] datetime.timedelta(天)

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
import airflow.utils.dates
from datetime import  datetime,  timedelta
default_args = {
"owner": "airflow", 
"start_date": airflow.utils.dates.days_ago(1)
}
with DAG(dag_id="CrossDAG_DAG_A", default_args=default_args, schedule_interval="*/10 * * * *") as dag:
sensor = ExternalTaskSensor(
task_id= 'sensor',
external_dag_id= "CrossDAG_DAG_B",
external_task_id= 't1',
execution_delta= timedelta(minutes =5)
)
last_task = DummyOperator(task_id= "last_task")

sensor >> last_task ```

you can create the second dag according to your requiremnt but i have shared the main logic here. moreover i have shared the complete code in anohter post youc an see that for more explaination.


[answere to another post][1]

[1]: https://stackoverflow.com/questions/72597384/i-have-a-externaltasksensor-error-airflowsensortimeout/72601464?noredirect=1#comment128265090_72601464

最新更新