从 Python 源代码触发气流 DAG,找不到 Sqlite 表"dag"



我想从Python脚本(实际上是Jupyter笔记本(中调用Airflow DAG定义。

我确保我用于airflow命令的环境变量与python-dotenv一起设置(例如AIRFLOW_HOME(,以便在Jupyter笔记本中加载.env文件。此.env文件包含各种环境变量,包括AIRFLOW_HOME:

%load_ext dotenv
# the AIRFLOW_HOME env var must be an absolute file system path 
# if the Notebook is relative to where the `airflow.db` Sqlite DB is.
%dotenv ../path-to-env-file/.env

我确实看到了预期的文件系统路径,所以AIRFLOW_HOME指向Sqlite数据库airflow.db所在的位置-Jupyter环境print是正确的路径:

print(os.getenv("AIRFLOW_HOME"))

Sqlite DB以前是用airflow db init(与旧的AIRFLOW_HOME相同(等命令创建的,我还添加了一些在DAG中使用airflow connections add ...的连接。

所有这些东西要么通过Airflow Webserver+调度器工作,要么通过以下命令工作:

airflow tasks test 
--subdir ../dags 
'my_dag_id' 'my_task_id' "${TODAY}"
  • 所有这些配置细节都是从airflow命令调用进入Sqlite DB的
  • 我确实在那个Sqlite数据库中看到了所有的Airflow数据库表
  • 我确实看到了那些桌子上的行
  • 所有命令都按预期工作

现在我想做与上面相同的事情,但在Jupyter笔记本中定义了DAG,所以回到这一点,当我在Python:中这样做时

from airflow.api.client.local_client import Client
c = Client(None, None)
c.trigger_dag(dag_id='my_dag_id', run_id='test_run_id', conf={})

然后我看到一个长的堆栈竞争,开头有这样的消息:

OperationalError                          Traceback (most recent call last)

底部的信息是:

OperationalError: (sqlite3.OperationalError) no such table: dag
[SQL: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root_dag_id, dag.is_paused AS dag_is_paused, dag.is_subdag AS dag_is_subdag, dag.is_active AS dag_is_active, dag.last_parsed_time AS dag_last_parsed_time, dag.last_pickled AS dag_last_pickled, dag.last_expired AS dag_last_expired, dag.scheduler_lock AS dag_scheduler_lock, dag.pickle_id AS dag_pickle_id, dag.fileloc AS dag_fileloc, dag.owners AS dag_owners, dag.description AS dag_description, dag.default_view AS dag_default_view, dag.schedule_interval AS dag_schedule_interval, dag.max_active_tasks AS dag_max_active_tasks, dag.max_active_runs AS dag_max_active_runs, dag.has_task_concurrency_limits AS dag_has_task_concurrency_limits, dag.has_import_errors AS dag_has_import_errors, dag.next_dagrun AS dag_next_dagrun, dag.next_dagrun_data_interval_start AS dag_next_dagrun_data_interval_start, dag.next_dagrun_data_interval_end AS dag_next_dagrun_data_interval_end, dag.next_dagrun_create_after AS dag_next_dagrun_create_after 
FROM dag 
WHERE dag.dag_id = ?
LIMIT ? OFFSET ?]
[parameters: ('my_dag_id', 1, 0)]

但是dag表在那里,AIRFLOW_HOME指向那个Sqlite DB,并且该表包含期望的行:

-- this shows the expected row
SELECT * FROM dag WHERE dag_id = 'my_dag_id'

如何从Python或Jupyter以编程方式运行DAG?

首先,您可以通过Airflow UI自己运行DAG吗?听起来气流数据库可能没有正确初始化。尝试重置,运行";正常的";模式,然后继续前进。

注意:初始化DB的命令是airflow initdb,而不是airflow db init

其次,我建议使用Airflow API通过Python触发DAG,而不是使用本地客户端或使用CLI。

最新更新