为依赖于先前任务的5个python脚本创建管道



我想使用airflow创建一个管道调度程序,它将执行存储在主目录中的5个python脚本。python脚本有:test1.py、test2.py、test3.py、test4.py和final.py。我应该如何在气流中加载脚本,有人能帮我处理代码片段吗。我是气流的新手,我尝试过教程,但我无法理解使用调度器。

请不要重复这个问题,我真的需要理解。

给定文件test1.pytest2.pytest3.py作为

# this is `test1.py`
def entry_point_1():
print("entry_point_1")

您可以创建test_dag.py

.
├── __init__.py
├── test1.py
├── test2.py
├── test3.py
└── test_dag.py

有两种直接的方法

1.使用PythonOperator

# this is `test_dag.py`
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import test1, test2, test3
dag_python: DAG = DAG(dag_id="dag_using_python_op",
start_date=datetime(year=2019, month=1, day=14),
schedule_interval=None)
python_op_1: PythonOperator = PythonOperator(dag=dag_python,
task_id="python_op_1",
python_callable=test1.entry_point_1)
python_op_2: PythonOperator = PythonOperator(dag=dag_python,
task_id="python_op_2",
python_callable=test2.entry_point_2)
python_op_3: PythonOperator = PythonOperator(dag=dag_python,
task_id="python_op_3",
python_callable=test3.entry_point_3)
python_op_1 >> python_op_2 >> python_op_3

2.使用BashOperator

# this is `test_dag.py`
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
dag_bash: DAG = DAG(dag_id="dag_using_bash_op",
start_date=datetime(year=2019, month=1, day=14),
schedule_interval=None)
bash_op_1: BashOperator = BashOperator(dag=dag_bash,
task_id="bash_op_1",
bash_command="python -c 'import test1; test1.entry_point_1()")
bash_op_2: BashOperator = BashOperator(dag=dag_bash,
task_id="bash_op_2",
bash_command="python -c 'import test2; test2.entry_point_2()'")
bash_op_3: BashOperator = BashOperator(dag=dag_bash,
task_id="bash_op_3",
bash_command="python -c 'import test3; test3.entry_point_3()'")
bash_op_1 >> bash_op_2 >> bash_op_3

注意:您必须修复PYTHONPATH才能使其工作;我做不到,但试一下(并在评论中报告你的发现)


  • 参考:Python:从命令行运行函数
  • 代码片段跟随Python 3

最新更新