Airflow Composer-使用PythonOperator时,无法访问存储在gcp存储中的配置文件



我使用的是AirflowPython操作符,它调用API从外部系统获取数据,解析数据并放入MongoDB(我想我也可以使用SimpleHttpOperator(。

在被调用的方法中,我需要使用一个配置文件(director_api.cfg(,它有外部系统的凭据,也有Mongo实例的凭据。配置文件存储在gcp存储桶中

这是dag代码:

from airflow.operators.python import PythonOperator
from airflow import models
from UpdateDirectorDataInMongo import main
# UpdateDirectorDataInMongo.py has the main method which is called in the PythonOperator task. This is stored in the same bucket as the dag file

with models.DAG(
'Versa-directorinfo',
# Continue to run DAG twice per day
default_args=default_dag_args,
schedule_interval=None,
catchup=False,
) as dag:

update_director_info = PythonOperator(
task_id="update_director_info",
python_callable=main
)
update_director_info

## UpdateDirectorDataInMongo.py - code where i try to access the director_api.cfg file
import configparser
self.api_username = parser.get("director-api", "user")
self.api_passwd = parser.get("director-api", "passwd")
self.mongoConnUri = parser.get('mongo', 'mongoConnUri') + "?retryWrites=true&w=majority"

# director-api.cfg (location is the same storage bucket as the dag file)
[director-api]
user=<user>
passwd=<passwd>
[mongo]
mongoConnUri=mongodb+srv://<user>:<passwd>@cluster0.w9yss.mongodb.net/<project>

使用Python运算符,我无法访问配置文件错误如下所示:

-04, 04:19:13 UTC] {taskinstance.py:1776} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 174, in execute
return_value = self.execute_callable()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 188, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/airflow/gcs/dags/UpdateDirectorDataInMongo.py", line 86, in main
customers = getCustomers()
File "/home/airflow/gcs/dags/UpdateDirectorDataInMongo.py", line 72, in getCustomers
mongoConnUri = parser.get('mongo', 'mongoConnUri') + "?retryWrites=true&w=majority"
File "/opt/python3.8/lib/python3.8/configparser.py", line 781, in get
d = self._unify_values(section, vars)
File "/opt/python3.8/lib/python3.8/configparser.py", line 1149, in _unify_values
raise NoSectionError(section) from None
configparser.NoSectionError: No section: 'mongo'

需要做些什么来修复/启用此问题?tia!

如果您的director-api.cfg被添加到Cloud Composerbucket的根文件夹中,您可以在UpdateDirectorDataInMongo.py文件中使用以下方式访问您的文件:

# Root path of Cloud Composer bucket
DAGS_FOLDER = os.getenv("DAGS_FOLDER")
parser.read([f'{DAGS_FOLDER}/director-api.cfg'])

您还可以考虑另一种更安全的解决方案:

  • Secret manager中添加您的秘密变量
  • 在您的PythonOperator中,使用Google CloudPython客户端访问Secret manager的机密:
from google.cloud import secretmanager
def _get_secret(project, secret_name, version='1'):
client = secretmanager.SecretManagerServiceClient()
secret_path = client.secret_version_path(project, secret_name, version)
secret = client.access_secret_version(secret_path)
return secret.payload.data.decode('UTF-8')
your_secret_value = _get_secret(your_project, your_key)

要使其工作,需要将director-api.cfg文件与UpdateDirectorDataInMongo.py文件放在同一存储桶/文件夹中。在这种情况下,DAG文件并不重要,因为PythonOperator使用UpdateDirectorDataInMongo.py作为main。

您的解析器应该是:parser.read(['director-api.cfg'])

最新更新