我想从Python脚本(实际上是Jupyter笔记本(中调用Airflow DAG定义。
我确保我用于airflow
命令的环境变量与python-dotenv
一起设置(例如AIRFLOW_HOME
(,以便在Jupyter笔记本中加载.env
文件。此.env
文件包含各种环境变量,包括AIRFLOW_HOME
:
%load_ext dotenv
# the AIRFLOW_HOME env var must be an absolute file system path
# if the Notebook is relative to where the `airflow.db` Sqlite DB is.
%dotenv ../path-to-env-file/.env
我确实看到了预期的文件系统路径,所以AIRFLOW_HOME
指向Sqlite数据库airflow.db
所在的位置-Jupyter环境print
是正确的路径:
print(os.getenv("AIRFLOW_HOME"))
Sqlite DB以前是用airflow db init
(与旧的AIRFLOW_HOME
相同(等命令创建的,我还添加了一些在DAG中使用airflow connections add ...
的连接。
所有这些东西要么通过Airflow Webserver+调度器工作,要么通过以下命令工作:
airflow tasks test
--subdir ../dags
'my_dag_id' 'my_task_id' "${TODAY}"
- 所有这些配置细节都是从
airflow
命令调用进入Sqlite DB的 - 我确实在那个Sqlite数据库中看到了所有的Airflow数据库表
- 我确实看到了那些桌子上的行
- 所有命令都按预期工作
现在我想做与上面相同的事情,但在Jupyter笔记本中定义了DAG,所以回到这一点,当我在Python:中这样做时
from airflow.api.client.local_client import Client
c = Client(None, None)
c.trigger_dag(dag_id='my_dag_id', run_id='test_run_id', conf={})
然后我看到一个长的堆栈竞争,开头有这样的消息:
OperationalError Traceback (most recent call last)
底部的信息是:
OperationalError: (sqlite3.OperationalError) no such table: dag
[SQL: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root_dag_id, dag.is_paused AS dag_is_paused, dag.is_subdag AS dag_is_subdag, dag.is_active AS dag_is_active, dag.last_parsed_time AS dag_last_parsed_time, dag.last_pickled AS dag_last_pickled, dag.last_expired AS dag_last_expired, dag.scheduler_lock AS dag_scheduler_lock, dag.pickle_id AS dag_pickle_id, dag.fileloc AS dag_fileloc, dag.owners AS dag_owners, dag.description AS dag_description, dag.default_view AS dag_default_view, dag.schedule_interval AS dag_schedule_interval, dag.max_active_tasks AS dag_max_active_tasks, dag.max_active_runs AS dag_max_active_runs, dag.has_task_concurrency_limits AS dag_has_task_concurrency_limits, dag.has_import_errors AS dag_has_import_errors, dag.next_dagrun AS dag_next_dagrun, dag.next_dagrun_data_interval_start AS dag_next_dagrun_data_interval_start, dag.next_dagrun_data_interval_end AS dag_next_dagrun_data_interval_end, dag.next_dagrun_create_after AS dag_next_dagrun_create_after
FROM dag
WHERE dag.dag_id = ?
LIMIT ? OFFSET ?]
[parameters: ('my_dag_id', 1, 0)]
但是dag
表在那里,AIRFLOW_HOME
指向那个Sqlite DB,并且该表包含期望的行:
-- this shows the expected row
SELECT * FROM dag WHERE dag_id = 'my_dag_id'
如何从Python或Jupyter以编程方式运行DAG?
首先,您可以通过Airflow UI自己运行DAG吗?听起来气流数据库可能没有正确初始化。尝试重置,运行";正常的";模式,然后继续前进。
注意:初始化DB的命令是airflow initdb
,而不是airflow db init
。
其次,我建议使用Airflow API通过Python触发DAG,而不是使用本地客户端或使用CLI。