airflow.exceptions.AirflowException: dag_id could not found:



我安装了气流。

airflow info产率:

Apache Airflow
version                | 2.2.0                                                
executor               | SequentialExecutor                                   
task_logging_handler   | airflow.utils.log.file_task_handler.FileTaskHandler  
sql_alchemy_conn       | sqlite:////home/user@local/airflow/airflow.db
dags_folder            | /home/user@local/airflow/dags                
plugins_folder         | /home/user@local/airflow/plugins             
base_log_folder        | /home/user@local/airflow/logs                
remote_base_log_folder |                                                      
        
System info
OS              | Linux                                                                                  
architecture    | x86_64                                                                                 
uname           | uname_result(system='Linux', node='ubuntuVM.local', release='5.11.0-37-generic',   
| version='#41~20.04.2-Ubuntu SMP Fri Sep 24 09:06:38 UTC 2021', machine='x86_64')       
locale          | ('en_US', 'UTF-8')                                                                     
python_version  | 3.9.7 (default, Sep 16 2021, 13:09:58)  [GCC 7.5.0]                                    
python_location | /home/user@local/anaconda3/envs/airflow/bin/python                             
                                   
Tools info
git             | git version 2.25.1                                                                     
ssh             | OpenSSH_8.2p1 Ubuntu-4ubuntu0.3, OpenSSL 1.1.1f  31 Mar 2020                           
kubectl         | NOT AVAILABLE                                                                          
gcloud          | NOT AVAILABLE                                                                          
cloud_sql_proxy | NOT AVAILABLE                                                                          
mysql           | NOT AVAILABLE                                                                          
sqlite3         | 3.36.0 2021-06-18 18:36:39                                                             
| 5c9a6c06871cb9fe42814af9c039eb6da5427a6ec28f187af7ebfb62eafa66e5                       
psql            | NOT AVAILABLE                                                                          
                                   
Paths info
airflow_home    | /home/user@local/airflow                                                       
system_path     | /home/user@local/anaconda3/envs/airflow/bin:/home/user@local/anaconda3/
| condabin:/sbin:/bin:/usr/bin:/usr/local/bin:/snap/bin                                  
python_path     | /home/user@local/anaconda3/envs/airflow/bin:/home/user@local/anaconda3/
| envs/airflow/lib/python39.zip:/home/user@local/anaconda3/envs/airflow/lib/pytho
| n3.9:/home/user@local/anaconda3/envs/airflow/lib/python3.9/lib-dynload:/home/jm
| ellone@local/anaconda3/envs/airflow/lib/python3.9/site-packages:/home/user@ocp.
| local/airflow/dags:/home/user@local/airflow/config:/home/user@local/air
| flow/plugins                                                                           
airflow_on_path | True                                                                                   
                                   
Providers info
apache-airflow-providers-celery | 2.1.0
apache-airflow-providers-ftp    | 2.0.1
apache-airflow-providers-http   | 2.0.1
apache-airflow-providers-imap   | 2.0.1
apache-airflow-providers-sqlite | 2.0.1

我将cd/home/user@local/airflow/dagstouch创建一个文件sample_dag.py

接下来,我跑了:

airflow dags backfill sample_dag

但气流抛出:

airflow.exceptions.AirflowException: dag_id could not be found: sample_dag.py. Either the dag did not exist or it failed to parse.

此外,我在localhost:8080页面中没有看到我的dag,但我确实看到了示例dag。

我没有理由认为一个空白的py文件应该工作,但我认为它应该被看到。

如何创建我的第一个DAG?从我读过的文档来看,这个应该是正确的。

抛出异常,因为没有dag_id为"sample_dag"在dags_folder位置。dag_id是在调用DAG构造函数而不是引用DAG文件名时设置的。

例如:

with DAG(
dag_id='hello_world',
schedule_interval="@daily",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
...

一个空的DAG文件将不会被气流识别,也不会在UI中创建一个空的DAG。

要开始使用您的第一个DAG,您可以查看经典教程,TaskFlow API教程,或潜入一个示例DAG,最初通过气流UI中的代码视图加载。

相关内容