如何暂停/取消暂停气流中的多个dag



我们有100个dag,其前缀为";dag_EDW_HC_*"。我们有以下命令暂停dag

Command: airflow pause dag_id

我们有没有办法暂停所有的100达格";dag_ EDW_HC_;一气呵成。。(以python编程或任何其他方式(。。?

我能想到的最简单(也可能是最快的(的方法是更新数据库:

UPDATE  dag
SET  is_paused = false
WHERE  dag_id LIKE 'dag_EDW_HC%';

如果您想定期执行此操作,您可以使用相应的PythonOperator创建专门用于此目的的DAG,并在触发DAG时指定参数。从一个正在运行的任务实例(在我们传递给PythonOperatorpython_callable函数中,或在自定义运算符的execute方法中(,您可以访问DagBag对象,该对象包含加载到Airflow环境中的所有dag的dag ID,您可以使用该对象来获取DagModel,您可以循环并暂停所有dag:

def python_callable():
dag_bag = DagBag(read_dags_from_db=False)
for dag_id_ in dag_bag.dag_ids:
dag_model = airflow.models.dag.DagModel.get_dagmodel(dag_id_)
dag_model.set_is_paused(True)

当前代码适用于2.0.1版本,不同版本的Airflow可能会有所不同。如果此调用不适用于您,则应检查Airflow服务器版本的文档。

自2022年2月以来,谷歌提供了一个composer_dags.py脚本,可以使用以下命令在给定环境中暂停所有DAG:

python3 composer_dags.py --environment COMPOSER_1_ENV 
--project PROJECT_ID 
--location COMPOSER_1_LOCATION 
--operation pause

来源:https://cloud.google.com/composer/docs/migrate-composer-2-snapshots-af-1#step_pause_dags_in_your_environment

最新更新