GCP Apache Airflow-如何从专用存储库安装Python包并在DAG上导入



我有一个私人存储库。这个存储库具有我的DAG的常见功能。(例如:日期时间验证器、响应编码器函数(我想在我的DAG文件中导入这个存储库的函数,我使用了这个链接来完成

我创建了pip.conf文件。这个文件的位置是:my-bucket-name/config/pip/pip.conf,我在这个文件中添加了我的私有github存储库,如下所示:

[global]
extra-index-url=https://<token>@github.com/my-private-github-repo.git

之后,我想在我的dag文件上导入这个存储库的函数(例如:从公共repo导入*(,但我在我的dag上遇到了"找不到模块">错误。(不幸的是,在cloudcomposer日志中,我看不到任何显示私有github repo已安装的日志。(

我找了很多,但找不到如何做到这一点。

您可以将私有回购添加到PythonVirtualenvOperator中的需求中,如下所示:

from airflow import DAG
from airflow.decorators import task
@task.virtualenv(
task_id="virtualenv_python",
requirements=["https://<token>@github.com/my-private-github-repo.git"],
system_site_packages=False
)
def callable_from_virtualenv():
import your_private_module
..etc...

virtualenv_task = callable_from_virtualenv()

(示例取自Airflow python操作员示例(

为了避免在源代码中对令牌/凭证进行硬编码,您可以使用Airflow变量,如下所示:

from airflow.models import Variable
@task.virtualenv(
task_id="virtualenv_python",
requirements=[Variable.get("private_github_repo")],
system_site_packages=False
)

我已经将GitHub注册为连接,我不想将令牌复制为variable

所以我选择了另一种方法,使用f-string。我理解可读性更强,但我不知道这是否会降低解决方案的安全性。

PS:GITHUB是连接的名称。。。

from airflow import DAG
from airflow.decorators import task
from airflow.hooks.base import BaseHook
# Get connection
conn = BaseHook.get_connection('GITHUB')
@task.virtualenv(
task_id="virtualenv_python",
requirements=[
f'git+https://{conn.password}@github.com/my-org/my-private-github-repo.git'
],
system_site_packages=False
)
def callable_from_virtualenv():
import your_private_module
..etc...

virtualenv_task = callable_from_virtualenv()

作为一个优势,不需要添加几个变量(每个私有包一个!(。

我也尝试了jinja(这将避免包含BaseHook(,但我没有成功。。。

git+https://{{ conn.GITHUB.password }}@github.com/my-org/my-private-github-repo.git

最新更新