我有一个目录结构:
airflow_dags
├── dags
│ └── hk
│ └── hk_dag.py
├── plugins
│ └── cse
│ └── operators.py
│ └── cse_to_bq.py
└── test
└── dags
└── dag_test.py
在Cloud Composer创建的GCS桶中,有一个插件文件夹,我在这里上传cse
文件夹。
现在在我的hk_dag.py
文件中如果我像这样导入插件:
from plugins.cse.operators.cse_to_bq import CSEToBQOperator
并运行我的单元测试,它通过了,但在cloud composer中,我得到一个ModuleNotFoundError: No module named 'plugins'
错误消息。
如果我在hk_dag.py
中像这样导入插件:
from cse.operators.cse_to_bq import CSEToBQOperator
我的单元测试与ModuleNotFoundError: No module named 'cse'
失败,但它在云作曲家工作得很好。
如何解决?
在Airflow 2.0中导入插件,您只需要直接从operators模块中导入即可。
在你的例子中,必须是这样的:
from operators.cse_to_bq import CSEToBQOperator
但在此之前,你必须将文件夹结构更改为:
airflow_dags
├── dags
│ └── hk
│ └── hk_dag.py
├── plugins
│ └── operators
│ └── cse
│ └── cse_to_bq.py
└── test
└── dags
└── dag_test.py