我试图在PythonOperator,_etl_lasic
到另一个PythonOperator,_download_s3_data
之间传递数据,这工作得很好,但我想在传递值为None
时抛出异常,这应该将任务标记为失败。
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.exceptions import AirflowFailException
def _etl_lasic(**context):
path_s3 = None
context["task_instance"].xcom_push(
key="path_s3",
value=path_s3,
)
def _download_s3_data(templates_dict, **context):
path_s3 = templates_dict["path_s3"]
if not path_s3:
raise AirflowFailException("Path to S3 was not passed!")
else:
print(f"Path to S3: {path_s3}")
with DAG(
dag_id="02_lasic_retraining_without_etl",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@once",
) as dag:
etl_lasic = PythonOperator(
task_id="etl_lasic",
python_callable=_etl_lasic,
)
download_s3_data = PythonOperator(
task_id="download_s3_data",
python_callable=_download_s3_data,
templates_dict={
"path_s3": "{{task_instance.xcom_pull(task_ids='etl_lasic',key='path_s3')}}"
},
)
etl_lasic >> download_s3_data
日志:
[2021-08-17 04:04:41,128] {logging_mixin.py:103} INFO - Path to S3: None
[2021-08-17 04:04:41,128] {python.py:118} INFO - Done. Returned value was: None
[2021-08-17 04:04:41,143] {taskinstance.py:1135} INFO - Marking task as SUCCESS. dag_id=02_lasic_retraining_without_etl, task_id=download_s3_data, execution_date=20210817T040439, start_date=20210817T040440, end_date=20210817T040441
[2021-08-17 04:04:41,189] {taskinstance.py:1195} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-17 04:04:41,212] {local_task_job.py:118} INFO - Task exited with return code 0
jinja模板值默认呈现为字符串。在你的情况下,即使你推了None
的XCom值,当值通过"{{task_instance.xcom_pull(task_ids='etl_lasic',key='path_s3')}}"
被拉出时,值是实际上呈现为"None"它不会根据当前逻辑抛出异常。
有两个选项可以解决这个问题:
- 在"_etl_lasic"中将
path_s3
设置为None
函数,设置为空字符串 - 如果你使用的是气流2.1+,有一个参数
render_template_as_native_obj
,可以在DAG级别设置,它将把jinja模板值渲染为原生Python类型(列表,字典等)。将该参数设置为True
将在不更改函数中如何设置path_s3
的情况下达到目的。这里有一个概念性的例子。