在python中执行定义为DAG的任务的简单方法?



我正在以一种复杂的方式运行一系列相互依赖的任务。我想将这些依赖描述为DAG(有向无环图),并在需要时执行该图。

我一直在看气流,并写了一个虚拟脚本:

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator

def cloud_runner():
# my typical usage here would be a http call to a service (e.g. gcp cloudrun)
pass

with DAG(dag_id="my_id", schedule_interval=None, start_date=datetime.max) as dag:
first_task = PythonOperator(task_id="1", python_callable=cloud_runner)
second_task = PythonOperator(task_id="2", python_callable=cloud_runner)
second_task_bis = PythonOperator(task_id="2bis", python_callable=cloud_runner)
third_task = PythonOperator(task_id="3", python_callable=cloud_runner)
first_task >> [second_task, second_task_bis] >> third_task

执行以下命令:

airflow dags backfill my_id --start-date 2020-01-02

问题:

我的使用将不会涉及任何日程安排/开始日期/结束日期。此外,我的DAG将从python Flask服务器执行。

问题:

有没有办法在没有气流的情况下达到同样的效果?或者在一个独立的python脚本中使用仅触发模式的气流(没有所有的调度部分,airflow.db等)?

感谢

气流既是库又是应用程序。dag不必以预定的方式运行。您可以使用API/CLI按需触发它们。如果气流应用程序未运行,则无法运行DAG(计划或手动触发)。气流需要调度程序和元数据库运行。

回答你的问题-不。你必须设置并运行气流才能使DAG运行。