MWAA - Airflow简单Python操作符使用本地导入将代码组织在多个文件中



我是第一次使用Amazon Managed Workflows for Apache Airflow (MWAA),所以我可能会错过一些基础知识。

如果我有一个python应用程序,我已经组织/分布在两个文件/脚本中,所以它看起来像这样:

my_script_1.py

import my_script_2
print('This is script 1')
my_script_2.print_me()

my_script_2.py

def print_me():
print('I am script 2')

当它在本地运行时,我得到:

% python my_script_1.py
This is script 1
I am script 2

如果我想运行my_script_1.py并让它调用/调用my_script_2.py,我将如何部署/组织此MWAA ?

如果我将my_script_1.py转换为DAG并将DAG上传到S3桶上的DAG文件夹,我如何将my_script_2.py提供给MWAA。

my_script_2.py:

  1. 不是像pypi.org这样的库仓库中的python库
  2. Python wheels (.whl)
  3. 不是托管在您的环境中的私有PyPi/PEP-503兼容的Repo上

我从Python依赖项概述中得到了这个列表。

是解决方案只是上传my_script_2.py到我的S3桶上的DAG文件夹,并期望MWAA复制my_script_2.pymy_script_1.py将在运行时访问的位置/路径?

这个链接从airflow.apache.org适用于MWAA环境包的典型结构?

其他Python脚本调用的Python脚本可以复制到S3桶DAG文件夹中,并且可以访问。下面是一个工作示例:

test_import.py

from datetime import datetime, timedelta
import my_script_1
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator

def invoke_my_python_scripts():
print(f"This is printed from the dag script: {__file__}")
my_script_1.print_me()

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
with DAG(
"dag_test_import",
default_args=default_args,
description="A simple tutorial DAG",
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
my_python_task = PythonOperator(
task_id="python_task_id_1", dag=dag, python_callable=invoke_my_python_scripts,
)

my_script_1.py

import my_script_2
def print_me():
print("This is script 1")
my_script_2.print_me()

my_script_2.py

def print_me():
print("I am script 2")

我将这些复制到我的AWS S3 bucket dag文件夹:

% aws s3 cp test_import.py  s3://mwaa/dags/test_import.py
upload: ./test_import.py to s3://mwaa/dags/test_import.py
% aws s3 cp my_script_1.py  s3://mwaa/dags/my_script_1.py
upload: ./my_script_1.py to s3://mwaa/dags/my_script_1.py
% aws s3 cp my_script_2.py  s3://mwaa/dags/my_script_2.py
upload: ./my_script_2.py to s3://mwaa/dags/my_script_2.py

MWAA日志显示了所有3个脚本的输出,所以它是工作的:

[2022-05-12, 10:52:58 UTC] {{logging_mixin.py:109}} INFO - This is printed from the dag script: /usr/local/airflow/dags/test_import.py
[2022-05-12, 10:52:58 UTC] {{logging_mixin.py:109}} INFO - This is script 1
[2022-05-12, 10:52:58 UTC] {{logging_mixin.py:109}} INFO - I am script 2
[2022-05-12, 10:52:58 UTC] {{python.py:152}} INFO - Done. Returned value was: None

最新更新