导入自定义插件在气流2[云作曲家]



我有一个目录结构:

airflow_dags
├── dags
│   └── hk  
│       └── hk_dag.py  
├── plugins
│   └── cse   
│       └── operators.py   
│           └── cse_to_bq.py   
└── test
└── dags   
└── dag_test.py  

在Cloud Composer创建的GCS桶中,有一个插件文件夹,我在这里上传cse文件夹。

现在在我的hk_dag.py文件中如果我像这样导入插件:

from plugins.cse.operators.cse_to_bq import CSEToBQOperator

并运行我的单元测试,它通过了,但在cloud composer中,我得到一个ModuleNotFoundError: No module named 'plugins'错误消息。

如果我在hk_dag.py中像这样导入插件:

from cse.operators.cse_to_bq import CSEToBQOperator

我的单元测试与ModuleNotFoundError: No module named 'cse'失败,但它在云作曲家工作得很好。

如何解决?

在Airflow 2.0中导入插件,您只需要直接从operators模块中导入即可。

在你的例子中,必须是这样的:

from operators.cse_to_bq import CSEToBQOperator

但在此之前,你必须将文件夹结构更改为:

airflow_dags
├── dags
│   └── hk  
│       └── hk_dag.py  
├── plugins
│   └── operators   
│       └── cse   
│           └── cse_to_bq.py 
└── test
└── dags   
└── dag_test.py 

最新更新