运行 Google 的 Cloud Compose 时,dags 无法使用的气流 dag 依赖项



Airflow 允许您将 DAG 依赖的依赖项(DAG 代码的外部 Python 代码)放在 DAG 文件夹中。 这意味着这些外部 Python 代码中的任何组件/成员或类都可以在 DAG 代码中使用。

但是,执行此操作(在云撰写环境的 GCS dag 文件夹中)时,依赖项的组件对 dag 不可用。 气流 Web UI 中显示类似于以下内容的错误: 损坏的 DAG: [/home/airflow/gcs/dags/....py] 没有名为测试器的模块。其中 tester 是 DAGS 文件夹中的一个单独的 Python 文件。

当使用Google的SDK(运行实际的Airflow命令)测试这些任务时,任务运行良好,但似乎在Kubernettes中创建这些容器作业的某个地方,它似乎也没有接管依赖项。

我意识到Cloud Compose处于测试阶段,但我想知道我是否做错了什么。

您应该将模块放在包含__init__.py文件的单独文件夹中(Airflow 不喜欢在其顶级 DAG 目录中__init__.py文件)。

例如,如果您有以下目录结构:

dags/
my_dag.py
my_deps/
__init__.py
dep_a.py
dep_b.py

你可以用my_dag.pyfrom my_deps import dep_a, dep_b

您是否正在寻找如何安装Python依赖项? https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies

此外,GCS 存储桶中的 DAG 文件夹(gcloud beta composer environments describe [environment]获取此存储桶;gs://{composer-bucket}/dags)应映射到 Pod 中的/home/airflow/gcs/dags。您是否尝试过SSH进入节点来找到这个?

我遇到了同样的问题,并在邮件列表中得到了帮助。有关参考,请参阅此处的线程:https://groups.google.com/forum/#!topic/cloud-composer-discuss/wTI7Pbwc6ZY。有一个指向方便的Github Gist的链接,其中包含一些评论。

若要将自己的依赖项写入 DAG 并将其导入到 DAG 中,需要压缩 dag 及其依赖项,如下所述:https://airflow.apache.org/concepts.html?highlight=zip#packaged-dags。

您可以将该 zip 文件直接上传到您的 Cloud Composer GCS 存储桶,Airflow 会选取它。

确保依赖项是位于dags目录顶层的包,而不是模块。

from foo_dep.foo_dep import my_utility_function将在这里工作:

foo_dag.py
foo_dep/__init__.py
foo_dep/foo_dep.py

from foo_dep import my_utility_function似乎它应该与以下 dags 目录结构一起使用(并且可以在本地工作),但它在 Airflow 中不起作用

foo_dag.py
foo_dep.py

来自有关配置 Airflow 的官方文档:

第一次运行 Airflow 时,它会在 $AIRFLOW_HOME 目录中创建一个名为 airflow.cfg 的文件(默认情况下为 ~/airflow)。此文件包含Airflow的配置,您可以对其进行编辑以更改任何设置

在第一个设置中设置的此文件中

[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /home/airflow/gcs/dags

气流的基本路径。

最新更新