抓取dags文件夹以提取ExternalTaskSensor任务和参数



我们已经广泛使用了[ExternalTaskSensor][1],以至于跨dag依赖关系的数量变得难以跟踪。因此,我们想要一种提取使用该传感器的所有任务以及传递给这些任务(如external_dag_idexternal_task_id(的参数的方法。提取这些信息可以让我们创建一个依赖项列表(如果我们想要的话,还可以创建一个图(。

方法:到目前为止,我们已经能够使用list_dags cli选项来获取所有dag的列表。然后,对于每个dag,我们运行带有-t参数的list_tasks选项,以获得所使用的任务和运算符的列表。下一步是检索传递给这些任务的参数,这就是我们遇到的问题。是否有任何官方或非官方的方法来抓取这些数据?

信息:我们正在运行Airflow 1.10.9和Composer 1.11.0。到目前为止,我们的脚本是用Python编写的。[1] :https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html

您可以这样做:

dag_models = session.query(DagModel).filter(DagModel.is_active.is_(True)).all()
for dag_model in dag_models:
     dag = dag_model.get_dag()
     for task in dag.task_dict.values():
         if isinstance(task, ExternalTaskSensor):
             do_smth(task.external_dag_id, task.external_task_id)

您可以利用Airflow的metadb进行此操作。

  • 直接查询

    SELECT operator
    FROM task_instance
    WHERE dag_id = 'my_dag'
      AND task_id = 'my_task';```
    
  • 或使用SQLAlchemy

    from airflow.utils.session import provide_session
    from airflow.models import TaskInstance
    @provide_session
    def get_operator_name(my_dag_id: str, my_task_id: str, session=None) -> str:
        """Fetch TaskInstance from the database using pickling"""
        task_instance: TaskInstance = session.query(TaskInstance).filter(TaskInstance.dag_id == my_dag_id).filter(TaskInstance.task_id == my_task_id).first()
        return task_instance.operator
    

这种方法的缺点是,在task至少运行一次(并且它的条目已在TaskInstance表中创建(之前,它不会工作


参考

  • cli.py

最新更新