遇到None/ false值时不抛出气流异常

  • 本文关键字:异常 None false 遇到 airflow
  • 更新时间 :
  • 英文 :


我试图在PythonOperator,_etl_lasic到另一个PythonOperator,_download_s3_data之间传递数据,这工作得很好,但我想在传递值为None时抛出异常,这应该将任务标记为失败。

import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.exceptions import AirflowFailException

def _etl_lasic(**context):
path_s3 = None
context["task_instance"].xcom_push(
key="path_s3",
value=path_s3,
)

def _download_s3_data(templates_dict, **context):
path_s3 = templates_dict["path_s3"]
if not path_s3:
raise AirflowFailException("Path to S3 was not passed!")
else:
print(f"Path to S3: {path_s3}")

with DAG(
dag_id="02_lasic_retraining_without_etl",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@once",
) as dag:
etl_lasic = PythonOperator(
task_id="etl_lasic",
python_callable=_etl_lasic,
)
download_s3_data = PythonOperator(
task_id="download_s3_data",
python_callable=_download_s3_data,
templates_dict={
"path_s3": "{{task_instance.xcom_pull(task_ids='etl_lasic',key='path_s3')}}"
},
)
etl_lasic >> download_s3_data

日志:

[2021-08-17 04:04:41,128] {logging_mixin.py:103} INFO - Path to S3: None
[2021-08-17 04:04:41,128] {python.py:118} INFO - Done. Returned value was: None
[2021-08-17 04:04:41,143] {taskinstance.py:1135} INFO - Marking task as SUCCESS. dag_id=02_lasic_retraining_without_etl, task_id=download_s3_data, execution_date=20210817T040439, start_date=20210817T040440, end_date=20210817T040441
[2021-08-17 04:04:41,189] {taskinstance.py:1195} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-17 04:04:41,212] {local_task_job.py:118} INFO - Task exited with return code 0

jinja模板值默认呈现为字符串。在你的情况下,即使你推了None的XCom值,当值通过"{{task_instance.xcom_pull(task_ids='etl_lasic',key='path_s3')}}"被拉出时,值是实际上呈现为"None"它不会根据当前逻辑抛出异常。

有两个选项可以解决这个问题:

  1. 在"_etl_lasic"中将path_s3设置为None函数,设置为空字符串
  2. 如果你使用的是气流2.1+,有一个参数render_template_as_native_obj,可以在DAG级别设置,它将把jinja模板值渲染为原生Python类型(列表,字典等)。将该参数设置为True将在不更改函数中如何设置path_s3的情况下达到目的。这里有一个概念性的例子。

相关内容

最新更新