气流BashOperator:将参数传递给外部bash脚本



从BashOperator向外部bash脚本传递参数时出现问题。当我运行本地命令时,参数被正确替换:

log_cleanup = """ echo "{{ params.BASE_LOG_FOLDER }}" """
log_cleanup_task = BashOperator(
task_id='log_cleanup_task',
provide_context=True,
bash_command = log_cleanup,
params = {'BASE_LOG_FOLDER': "/var/opt"},
dag=dagInstance,
)
prints:  "/var/opt"   (without the double quotes)

但是如果我调用一个外部bash脚本,params不会在.中替代

log_cleanup_task = BashOperator(
task_id='log_cleanup_task',
provide_context=True,
bash_command= str(DAGS_FOLDER)+"/scripts/log_cleanup.sh ",
params = {'BASE_LOG_FOLDER': "/var/opt" },
dag=dagInstance,
)
#log_cleanup.sh:
#! /usr/bin/bash
echo "{{ params.BASE_LOG_FOLDER }}"

prints: "{{ params.BASE_LOG_FOLDER }}"    (without the double quotes)

在外部bash脚本中,我无法像在DAG.py脚本中存储语句时那样获得要替换的参数。

我是否必须将params作为命令行参数传递?jinja模板只在.py文件中工作吗?

删除bash_command"log_cleanup.sh "后面的空格

所以你的任务应该变成:

log_cleanup_task = BashOperator(
task_id='log_cleanup_task',
provide_context=True,
bash_command= "scripts/log_cleanup.sh",
params = {'BASE_LOG_FOLDER': "/var/opt" },
dag=dagInstance,
)

注意脚本文件夹应位于包含DAG文件的文件夹内,并且应包含脚本的相对路径(相对于包含此DAG的文件夹(

出现TemplateNotFound错误的主要原因是Jinja(Airflow使用的模板引擎(无法识别bash_command中提到的路径。Jinja只识别DAG.template_searchpath中传递的路径默认路径是包含DAG的文件夹,因此如果DAG直接位于$AIRFLOW_HOME/dags中,则可以将脚本文件夹直接放置在DAG文件夹下。或者,您可以将路径传递到DAG.template_searchpath中的文件夹,如下所示:

dag = DAG("example_dag", template_searchpath="/var/opt/scripts")
# And then just pass "filename" to bash_command
log_cleanup_task = BashOperator(
task_id='log_cleanup_task',
provide_context=True,
bash_command= "log_cleanup.sh ",
params = {'BASE_LOG_FOLDER': "/var/opt" },
dag=dag,
)

最新更新