我们已经广泛使用了[ExternalTaskSensor][1],以至于跨dag依赖关系的数量变得难以跟踪。因此,我们想要一种提取使用该传感器的所有任务以及传递给这些任务(如external_dag_id
和external_task_id
(的参数的方法。提取这些信息可以让我们创建一个依赖项列表(如果我们想要的话,还可以创建一个图(。
方法:到目前为止,我们已经能够使用list_dags
cli选项来获取所有dag的列表。然后,对于每个dag,我们运行带有-t
参数的list_tasks
选项,以获得所使用的任务和运算符的列表。下一步是检索传递给这些任务的参数,这就是我们遇到的问题。是否有任何官方或非官方的方法来抓取这些数据?
信息:我们正在运行Airflow 1.10.9和Composer 1.11.0。到目前为止,我们的脚本是用Python编写的。[1] :https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html
您可以这样做:
dag_models = session.query(DagModel).filter(DagModel.is_active.is_(True)).all()
for dag_model in dag_models:
dag = dag_model.get_dag()
for task in dag.task_dict.values():
if isinstance(task, ExternalTaskSensor):
do_smth(task.external_dag_id, task.external_task_id)
您可以利用Airflow的metadb进行此操作。
-
直接查询
SELECT operator FROM task_instance WHERE dag_id = 'my_dag' AND task_id = 'my_task';```
-
或使用
SQLAlchemy
from airflow.utils.session import provide_session from airflow.models import TaskInstance @provide_session def get_operator_name(my_dag_id: str, my_task_id: str, session=None) -> str: """Fetch TaskInstance from the database using pickling""" task_instance: TaskInstance = session.query(TaskInstance).filter(TaskInstance.dag_id == my_dag_id).filter(TaskInstance.task_id == my_task_id).first() return task_instance.operator
这种方法的缺点是,在task
至少运行一次(并且它的条目已在TaskInstance
表中创建(之前,它不会工作
参考
cli.py