设置操作系统环境变量可用的非dag文件管理的apache气流



我正试图从基于docker的气流服务转向AWS提供的托管apache气流。MWAA要求您指定一个dags文件夹,其中包含所有dags。此外,我可以在dags文件夹中创建其他文件夹和文件,并包括.airflowignore文件,以便将这些文件/文件夹视为非dag。我可以在这些文件中指定一些常见的函数,并将它们导入我的dag代码中,以便使用这些函数。到目前为止还不错。当我不得不在这些常见的非dag文件中使用某些环境变量时,问题就来了。我在这里找到了如何使用插件设置运行时os-env变量https://docs.aws.amazon.com/mwaa/latest/userguide/samples-env-variables.html

from airflow.plugins_manager import AirflowPlugin
import os
os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/lib/python3.7/site-packages" 
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.272.b10-1.amzn2.0.1.x86_64"
os.environ["My_Var"] = "hello"
class EnvVarPlugin(AirflowPlugin):                
name = 'env_var_plugin'

使用此代码,我设置了一个ENV变量My_Var。我的foder结构看起来像这个

dags
|
|-hello_dag.py
|-common
|  |
|  |-varcheck.py

在hello_dg.py中,我正在从varcheck.py 导入VAR

import os
from common.varcheck import VAR
print(os.environ["My_Var"])
print(VAR)

在varcheck.py中,我正在进行

import os
VAR = os.environ["My_Var"]

我在这个上收到一个导入错误

Broken DAG: [/usr/local/airflow/dags/hello_dag.py] Traceback (most recent call last):
File "/usr/local/airflow/dags/varcheck.py", line 2, in <module>
VAR = os.environ['MY_VAR']
File "/usr/lib64/python3.7/os.py", line 681, in __getitem__
raise KeyError(key) from None
KeyError: 'MY_VAR'

有趣的是,如果我将varcheck.py文件更改为

import os
VAR = os.environ

那么代码运行良好,print(VAR(打印所有环境变量。我想知道我们如何在MWAA中的非dag文件中使用os环境变量,因为这对我们的实现至关重要。

编辑:回购的当前结构

repo
|-common (used by all others)
|-airflow (deployed in ECS)
|-jobs_1 (deployed in batch)
|-jobs_2 (deployed in batch)
|-jobs_3 (deployed in lambda)

常见文件使用env变量。如果我们要用气流变量替换它们,我们需要为气流维护单独的公共文件,因为job_1、job_2和job_3与气流无关,它们使用公共文件。

环境变量与CASE相关。

与您的帖子相关:
在您的第一个帖子代码块中,您正在使用

os.environ["My_Var"] = "hello"

在您的堆栈跟踪中,我们看到

VAR = os.environ['MY_VAR']

但是,如果计划在多个位置使用此环境变量,但不需要它来启动气流,也可以考虑使用airflow变量。

问题出在插件文件中。

VAR = os.environ["My_Var"]

没有使用名称My_Var设置环境变量,因此它会给您一个KeyError。顺便说一句,你可能想试试气流变量。

最新更新