401在尝试运行DAG时来自MWAA气流环境



描述

当试图对托管Amazon Airflow实例进行调用时,尽管能够从aws mwaa create-cli-token生成CLI令牌,但我无法对气流环境进行API调用。为什么我在这里得到一个被禁止的错误?

控制台和代码

代码

@pytest.fixture(scope="function")
def run_dag(
environment_name: str,
airflow_conn_info: AirflowConnectionInfo,
dag_to_run: str,
target_date: datetime) -> Callable:
headers = {
'accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': airflow_conn_info.cli_token,
}
trigger_body = {
"conf": {},
"execution_date": target_date.isoformat(),
}
if not dag_to_run:
dag_to_run = f"{carrier_name}_dag"
base_url = f"https://{airflow_conn_info.hostname}/api/v1/dags/{dag_to_run}"
trigger_dag_url = f"{base_url}/dagRuns"
# TODO: Add some sort of check to ensure a DAG isn't disabled OR add a call
#       to enable the DAG. For now we're just going to assume it's enabled.
trigger_response = requests.post(
trigger_dag_url,
headers=headers,
data=json.dumps(trigger_body))
if trigger_response.status_code != 200:
raise ValueError(f"{trigger_response}")
dag_run_status_url = f"{base_url}/{trigger_response.json()['dag_id']}"
status_body = {}
task_instances_status_url = f"{base_url}/dagRuns/{trigger_response.json()['dag_id']/taskInstances}"
task_instances_body = {}
status_response = requests.get(
dag_run_status_url,
headers=headers,
data=json.dumps(status_body))
if status_response.status_code != 200:
raise ValueError(f"{trigger_response}")
# Derived from
# https://github.com/apache/airflow/blob/main/airflow/utils/state.py
terminal_states: List[str] = ["success", "failed"]
# TODO: Add a timeout to this while loop.
while (trigger_response.status_code == 200
and trigger_response.json()["state"] not in terminal_states):
# TODO: Add some sort of console output to show what task instance we're
#       on and the state of that current task instance.
status_response = requests.get(
dag_run_status_url,
headers=headers,
data=json.dumps(status_body))
task_instances_response = requests.get(
task_instances_status_url,
headers=headers,
data=json.dumps(task_instances_body))
breakpoint()

失败运行PDB

(Pdb) base_url
'https://{a_string}.c46.us-east-1.airflow.amazonaws.com/api/v1/dags/fedex_dag'
(Pdb) trigger_response.json()
{'detail': None, 'status': 401, 'title': 'Unauthorized', 'type': 'https://airflow.apache.org/docs/2.0.2/stable-rest-api-ref.html#section/Errors/Unauthenticated'}
(Pdb) headers
{'accept': 'application/json', 'Content-Type': 'application/json', 'Authorization': '{secret}'}

TLDR:API默认关闭。这在任何地方都没有明确说明。除非要启用完整的API,否则应使用https://YOUR_HOST_NAME/aws_mwaa/cli端点。

当我阅读有关生成CLI令牌的AWS文档时,我不清楚AWS添加到MWAA的aws_mwaa/cli端点是他们希望您使用的端点。这在用户指南中有解释,但在任何网站文档中都没有,这让我很不清楚

有一个AmazonMWAAFullApiAccess,如果您有权访问带有策略的角色,但我还没有对此进行测试,那么它听起来像是授予对整个API的访问权限。

最新更新