如何让Airflow用户手动触发DAG



在气流UI中,"浏览器>日志"下可用的日志事件之一是事件"触发器",以及DAG ID和负责触发此事件的所有者/用户。这些信息很容易通过程序获得吗?

用例是,我有一个DAG,它允许用户的子集手动触发执行。根据触发执行该DAG的用户,从该DAG执行代码的行为将有所不同。

提前谢谢。

您可以直接从气流元数据数据库中的Log表中获取它,如下所示:

from airflow.models.log import Log
from airflow.utils.db import create_session
with create_session() as session:
results = session.query(Log.dttm, Log.dag_id, Log.execution_date, Log.owner, Log.extra).filter(Log.dag_id == 'example_trigger_target_dag', Log.event == 'trigger').all()
# Get top 2 records
results[2]

输出

(datetime.datetime(2020, 3, 30, 23, 16, 52, 487095, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
'example_trigger_target_dag',
None,
'admin',
'[('dag_id', 'example_trigger_target_dag'), ('origin', '/tree?dag_id=example_trigger_target_dag'), ('csrf_token', 'IjhmYzQ4MGU2NGFjMzg2ZWI3ZjgyMTA1MWM3N2RhYmZiOThkOTFhMTYi.XoJ92A.5q35ClFnQjKRiWwata8dNlVs-98'), ('conf', '{"message": "kaxil"}')]')

我会稍微更正前面的答案:

with create_session() as session:
results = session.query(Log.dttm, Log.dag_id, Log.execution_date, 
Log.owner, Log.extra)
.filter(Log.dag_id == 'dag_id', Log.event == 
'trigger').order_by(Log.dttm.desc()).all()

最新更新