如何在DAG python代码中使用Airflow模板引用



我是Airflow世界的新手,正在努力理解一件事。例如,我有一个DAG,它包含2个任务。第一个任务是提交spark作业,第二个任务是在s3中等待文件的Sensor。

RUN_DATE_ARG = datetime.utcnow().strftime(DATE_FORMAT_PY)
DATE = datetime.strptime(RUN_DATE_ARG, DATE_FORMAT_PY) - timedelta(hours=1)
with DAG() as dag:

submit_spark_job = EmrContainerOperator(
task_id="start_job",
virtual_cluster_id=VIRTUAL_CLUSTER_ID,
execution_role_arn=JOB_ROLE_ARN,
release_label="emr-6.3.0-latest",
job_driver=JOB_DRIVER_ARG,
configuration_overrides=CONFIGURATION_OVERRIDES_ARG,
name=f"spark-{RUN_DATE_ARG}",
retries=3
)
validate_s3_success_file = S3KeySensor(
task_id='check_for_success_file',
bucket_name="bucket-name",
bucket_key=f"blabla/date={DATE.strftime('%Y-%m-%d')}/hour={DATE.strftime('%H')}/_SUCCESS",
poke_interval=10,
timeout=60,
verify=False,
)

我有一个RUN_DATE_ARG,默认情况下应该取自datetime.utcnow(),这是我应该为我的工作提供的sparks java参数之一。我想添加一个功能,提交工作与自定义日期参数(通过气流用户界面(。当我试图将其检索为'{{ dag_run.conf["date"] | None}}'时,它会替换为任务配置(bucket_key=f"blabla/date={DATE.strftime('%Y-%m-%d')}/hour={DATE.strftime('%H')}/_SUCCESS",(中的值,但如果我执行以下操作,则不会用于DAG的python代码:

date='{{ dag_run.conf["date"] | None}}'
if date is None:
RUN_DATE_ARG = datetime.utcnow().strftime(DATE_FORMAT_PY)
else: 
RUN_DATE_ARG = date

我有什么方法可以将这个值用作代码变量吗?

不能在运算符范围之外使用模板。

您应该在运算符模板化参数中使用Jinja-if语句。以下只是一个总体想法:

submit_spark_job = EmrContainerOperator(
task_id="start_job",
...
name="spark-{{ dag_run.conf["date"] if dag_run.conf["date"] is not None else jinja_utc_now }}",
)

您需要将jinja_utc_now替换为检索时间戳的代码,可能与此答案中显示的内容类似。

您也可以使用:

{% if something %}
code
{% else %}
another code
{% endif %}

从Airflow的角度来看,它接受参数并通过Jinja引擎进行模板化,因此这里的关键问题只是使用正确的Jinja语法。

最新更新