我们有100个dag,其前缀为";dag_EDW_HC_*"。我们有以下命令暂停dag
Command: airflow pause dag_id
我们有没有办法暂停所有的100达格";dag_ EDW_HC_;一气呵成。。(以python编程或任何其他方式(。。?
我能想到的最简单(也可能是最快的(的方法是更新数据库:
UPDATE dag
SET is_paused = false
WHERE dag_id LIKE 'dag_EDW_HC%';
如果您想定期执行此操作,您可以使用相应的PythonOperator
创建专门用于此目的的DAG,并在触发DAG时指定参数。从一个正在运行的任务实例(在我们传递给PythonOperator
的python_callable
函数中,或在自定义运算符的execute
方法中(,您可以访问DagBag
对象,该对象包含加载到Airflow环境中的所有dag的dag ID,您可以使用该对象来获取DagModel
,您可以循环并暂停所有dag:
def python_callable():
dag_bag = DagBag(read_dags_from_db=False)
for dag_id_ in dag_bag.dag_ids:
dag_model = airflow.models.dag.DagModel.get_dagmodel(dag_id_)
dag_model.set_is_paused(True)
当前代码适用于2.0.1
版本,不同版本的Airflow可能会有所不同。如果此调用不适用于您,则应检查Airflow服务器版本的文档。
自2022年2月以来,谷歌提供了一个composer_dags.py
脚本,可以使用以下命令在给定环境中暂停所有DAG:
python3 composer_dags.py --environment COMPOSER_1_ENV
--project PROJECT_ID
--location COMPOSER_1_LOCATION
--operation pause
来源:https://cloud.google.com/composer/docs/migrate-composer-2-snapshots-af-1#step_pause_dags_in_your_environment