MWAA未找到aws_default连接



我刚刚设置了AWS MWAA(管理气流(,我正在dag中运行一个简单的bash脚本。我正在阅读该任务的日志,注意到默认情况下,该任务会查找aws_default连接并尝试使用它,但没有找到

我转到连接窗格并设置了aws_default连接,但它仍然在日志中显示相同的消息。

气流连接:aws_conn_id=aws_default

没有从连接检索到凭据

*** Reading remote log from Cloudwatch log_group: airflow-mwaa-Task log_stream: dms-
postgres-dialog-label-pg/start-replication-task/2021-11-22T13_00_00+00_00/1.log.
[2021-11-23 13:01:02,487] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,486] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2021-11-23 13:01:02,657] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,656] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2021-11-23 13:01:02,678] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,678] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=us-east-1
[2021-11-23 13:01:02,772] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,772] {{base_aws.py:157}} INFO - role_arn is None

如何让MWAA识别此连接?

我的dagger:

from datetime import datetime, timedelta, tzinfo
import pendulum
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
local_tz = pendulum.timezone("America/New_York")
start_date = datetime(2021, 11, 9, 8, tzinfo=local_tz)
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
with DAG(
'dms-postgres-dialog-label-pg-test',
default_args=default_args,
description='',
schedule_interval=timedelta(days=1),
start_date=start_date,
tags=['example'],
) as dag:
t1 = BashOperator(
task_id='start-replication-task',
bash_command="""
aws dms start-replication-task --replication-task-arn arn:aws:dms:us-east-1:blah --start-replication-task-type reload-target
""",
)
t1

编辑:现在,我只是导入一个内置函数,并使用它来获取凭据。示例:

from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('aws_service_account')
...
print(conn.host)
print(conn.login)
print(conn.password)

我刚开始使用AWS支持时更新了这一点。

MWAA创建的执行角色用于代替aws_default中的访问密钥id和secret。要使用自定义访问密钥id和机密,请按照@Jonathan Porter的建议回答问题:

from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('aws_service_account')
...
print(conn.host)
print(conn.login)
print(conn.password)

但是,如果要使用mwaa提供的执行角色,这是mwaa中的默认角色。令人困惑的是,信息消息表示没有从连接中检索到凭据,但是执行角色将用于类似于kubernetes pod操作符的东西

[2021-11-23 13:01:02,487] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,486] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2021-11-23 13:01:02,657] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,656] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2021-11-23 13:01:02,678] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,678] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=us-east-1
[2021-11-23 13:01:02,772] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,772] {{base_aws.py:157}} INFO - role_arn is None

例如,以下使用由mwaa env中的执行角色自动设置的.aws/凭据:

from datetime import timedelta
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
default_args = {
'owner': 'aws',
'depends_on_past': False,
'start_date': datetime(2019, 2, 20),
'provide_context': True
}
dag = DAG(
'kubernetes_pod_example', default_args=default_args, schedule_interval=None
)
#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'
podRun = KubernetesPodOperator(
namespace="mwaa",
image="ubuntu:18.04",
cmds=["bash"],
arguments=["-c", "ls"],
labels={"foo": "bar"},
name="mwaa-pod-test",
task_id="pod-task",
get_logs=True,
dag=dag,
is_delete_operator_pod=False,
config_file=kube_config_path,
in_cluster=False,
cluster_context='aws',
execution_timeout=timedelta(seconds=60)
)

希望这对其他磕磕碰碰的人有所帮助。

相关内容

最新更新