我如何从气流DAG内部对气流元数据库运行SQL查询?



我正在寻找一种方法来提取execution_date, start_date, end_date等最后一个成功运行实例的任务在DAG,然后决定提出一个错误,如果一些分支没有被触发,让我们说一个星期。

是否有一种方法,我们可以运行SQL查询上的气流元数据库寻找一个任务的最后成功运行实例,并提取必要的信息出的入口?我已经看了文档,但没有什么有用的。

您可以创建一个PythonOperator任务,从metastore查询任务实例状态。Airflow通过SQLAlchemy(一个Python ORM框架)在内部查询数据库。您也可以使用它从您的任务中查询数据库。例如:

import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.state import State
with DAG(dag_id="so_72230617", schedule_interval="@daily", start_date=datetime.datetime(2022, 5, 1)) as dag:
first = EmptyOperator(task_id="first")
@provide_session
def _fetch_last_successful_ti(session=None):
ti_exists_in_last_week = session.query(
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == "so_72230617",
TaskInstance.task_id == "first",
TaskInstance.state == State.SUCCESS,
TaskInstance.execution_date >= timezone.utcnow() - datetime.timedelta(weeks=1),
)
.exists()
).scalar()
if not ti_exists_in_last_week:
raise Exception("No successful 'first' task instance found with execution_date in the last week.")
fetch_last_successful_ti = PythonOperator(
task_id="fetch_last_successful_ti", python_callable=_fetch_last_successful_ti
)
first >> fetch_last_successful_ti

这里的技巧是@provide_session,它初始化一个数据库会话对象,您可以使用它来使用气流对象和SQLAlchemy查询数据库。

最新更新