从BashOperator向外部bash脚本传递参数时出现问题。当我运行本地命令时,参数被正确替换:
log_cleanup = """ echo "{{ params.BASE_LOG_FOLDER }}" """
log_cleanup_task = BashOperator(
task_id='log_cleanup_task',
provide_context=True,
bash_command = log_cleanup,
params = {'BASE_LOG_FOLDER': "/var/opt"},
dag=dagInstance,
)
prints: "/var/opt" (without the double quotes)
但是如果我调用一个外部bash脚本,params不会在.中替代
log_cleanup_task = BashOperator(
task_id='log_cleanup_task',
provide_context=True,
bash_command= str(DAGS_FOLDER)+"/scripts/log_cleanup.sh ",
params = {'BASE_LOG_FOLDER': "/var/opt" },
dag=dagInstance,
)
#log_cleanup.sh:
#! /usr/bin/bash
echo "{{ params.BASE_LOG_FOLDER }}"
prints: "{{ params.BASE_LOG_FOLDER }}" (without the double quotes)
在外部bash脚本中,我无法像在DAG.py脚本中存储语句时那样获得要替换的参数。
我是否必须将params作为命令行参数传递?jinja模板只在.py文件中工作吗?
删除bash_command
中"log_cleanup.sh "
后面的空格
所以你的任务应该变成:
log_cleanup_task = BashOperator(
task_id='log_cleanup_task',
provide_context=True,
bash_command= "scripts/log_cleanup.sh",
params = {'BASE_LOG_FOLDER': "/var/opt" },
dag=dagInstance,
)
注意脚本文件夹应位于包含DAG文件的文件夹内,并且应包含脚本的相对路径(相对于包含此DAG的文件夹(
出现TemplateNotFound
错误的主要原因是Jinja(Airflow使用的模板引擎(无法识别bash_command
中提到的路径。Jinja只识别DAG.template_searchpath中传递的路径默认路径是包含DAG的文件夹,因此如果DAG直接位于$AIRFLOW_HOME/dags
中,则可以将脚本文件夹直接放置在DAG文件夹下。或者,您可以将路径传递到DAG.template_searchpath中的文件夹,如下所示:
dag = DAG("example_dag", template_searchpath="/var/opt/scripts")
# And then just pass "filename" to bash_command
log_cleanup_task = BashOperator(
task_id='log_cleanup_task',
provide_context=True,
bash_command= "log_cleanup.sh ",
params = {'BASE_LOG_FOLDER': "/var/opt" },
dag=dag,
)