我是第一次使用Amazon Managed Workflows for Apache Airflow (MWAA),所以我可能会错过一些基础知识。
如果我有一个python应用程序,我已经组织/分布在两个文件/脚本中,所以它看起来像这样:
my_script_1.py
import my_script_2
print('This is script 1')
my_script_2.print_me()
my_script_2.py
def print_me():
print('I am script 2')
当它在本地运行时,我得到:
% python my_script_1.py
This is script 1
I am script 2
如果我想运行my_script_1.py并让它调用/调用my_script_2.py,我将如何部署/组织此MWAA ?
如果我将my_script_1.py转换为DAG并将DAG上传到S3桶上的DAG文件夹,我如何将my_script_2.py提供给MWAA。
my_script_2.py:
- 不是像pypi.org这样的库仓库中的python库 Python wheels (.whl)
- 不是托管在您的环境中的私有PyPi/PEP-503兼容的Repo上
我从Python依赖项概述中得到了这个列表。
是解决方案只是上传my_script_2.py到我的S3桶上的DAG文件夹,并期望MWAA复制my_script_2.py到my_script_1.py将在运行时访问的位置/路径?
这个链接从airflow.apache.org适用于MWAA环境包的典型结构?
其他Python脚本调用的Python脚本可以复制到S3桶DAG文件夹中,并且可以访问。下面是一个工作示例:
test_import.py
from datetime import datetime, timedelta
import my_script_1
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator
def invoke_my_python_scripts():
print(f"This is printed from the dag script: {__file__}")
my_script_1.print_me()
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
with DAG(
"dag_test_import",
default_args=default_args,
description="A simple tutorial DAG",
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
my_python_task = PythonOperator(
task_id="python_task_id_1", dag=dag, python_callable=invoke_my_python_scripts,
)
my_script_1.py
import my_script_2
def print_me():
print("This is script 1")
my_script_2.print_me()
my_script_2.py
def print_me():
print("I am script 2")
我将这些复制到我的AWS S3 bucket dag文件夹:
% aws s3 cp test_import.py s3://mwaa/dags/test_import.py
upload: ./test_import.py to s3://mwaa/dags/test_import.py
% aws s3 cp my_script_1.py s3://mwaa/dags/my_script_1.py
upload: ./my_script_1.py to s3://mwaa/dags/my_script_1.py
% aws s3 cp my_script_2.py s3://mwaa/dags/my_script_2.py
upload: ./my_script_2.py to s3://mwaa/dags/my_script_2.py
MWAA日志显示了所有3个脚本的输出,所以它是工作的:
[2022-05-12, 10:52:58 UTC] {{logging_mixin.py:109}} INFO - This is printed from the dag script: /usr/local/airflow/dags/test_import.py
[2022-05-12, 10:52:58 UTC] {{logging_mixin.py:109}} INFO - This is script 1
[2022-05-12, 10:52:58 UTC] {{logging_mixin.py:109}} INFO - I am script 2
[2022-05-12, 10:52:58 UTC] {{python.py:152}} INFO - Done. Returned value was: None