这是我用来触发python脚本的scriptlet。
task1= BashOperator(
task_id='task1_monthly_run',
bash_command="python /opt/airflow/dags/scripts/validation/runner.py arg1 arg2",
dag=dag)
我给出了在py脚本中使用的两个参数。因为我对不同的参数使用相同的脚本。
这是runner.py提供的,用于显示参数的用法:
import sys
arg_list = sys.argv[1:]
model_name = arg_list[0]
val_date_son = arg_list[1]
代码运行良好。args通过得很好。但是Airflow UI显示有关参数的错误:
损坏的DAG:[/opt/airflow/dags/scripts/validation/runner.py]回溯(最后一次调用(:文件"&";,线219,在_call_with_frames_removed文件中"opt/airflow/dags/scripts/validation/runner.py";,第10行,inval_date_son=arg_list1 IndexError:列出索引超出范围
DAG导入错误
我怎样才能摆脱这个错误?
Airflow通过查找DAG目录中的所有.py
文件来发现DAG。此外,Airflow优化为仅考虑包含单词dag
和airflow
的.py
(dag_discovery_safe_mode(。在您的情况下,runner.py
可能包含字符串dag
和airflow
,因此Airflow尝试解析文件并查找DAG,但由于这是一个脚本,Airflow遇到错误并引发Broken DAG消息。
尽管存在Broken DAG消息,但您能够成功执行DAG的原因是该消息不在DAG上,而是在runner.py
上。按照您的意思执行它(通过BashOperator(可以很好地工作,但试图将脚本解析为DAG文件会导致错误。
为了解决您的问题,您需要设置.airflowignore
,这将告诉Airflow不要在/dags/scripts/
下解析.py
文件,因为在脚本文件夹下您不应该存储dag文件。
或者,您可以替换runner.py
中出现的dag
和airflow
字符串-它不会显示Broken DAG消息,但Airflow仍将尝试解析该文件,因此首选.airflowignore
解决方案。