Airflow XCOM从BashOperator到PythonOperator的通信



我是Apache Airflow的新手,正在尝试编写我的第一个Dag,它有一个基于另一个任务的任务(使用ti.xcom_pull(

附言:我使用VScode在WSL Ubuntu 20.04中运行Airflow。

我创建了一个任务1(task_id="get_datetime">(;日期";bash命令(有效(

然后我创建了另一个任务(task_id='process_datetime'(,它采用第一个任务的日期时间并对其进行处理,然后我设置了python_callable,一切都很好。。

问题是dt=ti。xcom_pull在我运行">气流任务测试first_ariflow_dag process_datetime 2022-11-1";在终端中,但当我在Airflow UI中看到日志时,我发现它工作正常。有人能给我一个解决方案吗?

`

from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

def process_datetime(ti):
dt = ti.xcom_pull(task_ids=['get_datetime'])
if not dt :
raise Exception('No datetime value')

dt = str(dt[0]).split()
return{
'year':int(dt[-1]),
'month':dt[1],
'day':int(dt[2]),
'time':dt[3],
'day_of_week':dt[0]
}
with DAG(
dag_id='first_ariflow_dag',
schedule_interval='* * * * *',
start_date=datetime(year=2022, month=11, day=1),
catchup=False
) as dag:
# 1. Get the current datetime
task_get_datetime= BashOperator(
task_id = 'get_datetime',
bash_command='date'
)
# 2. Process the datetime
task_process_datetime= PythonOperator(
task_id = 'process_datetime',
python_callable=process_datetime
)

`

我得到这个错误:

[2022-11-02 00:51:45,420] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/mnt/c/Users/Salim/Desktop/A-Learning/Airflow_Conda/airflow_env/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/mnt/c/Users/Salim/Desktop/A-Learning/Airflow_Conda/airflow_env/lib/python3.8/site-packages/airflow/operators/python.py", line 193, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/salim/airflow/dags/first_dag.py", line 12, in process_datetime
raise Exception('No datetime value')
Exception: No datetime value

根据文档,要将数据上传到xcom,需要设置变量do_xcom_push(气流2(或xcom_push((气流1(。

如果BaseOperator.do_xcom_push为True,则写入stdout的最后一行当bash命令完成时,也将被推送到XCom

BashOperator应该是这样的:

task_get_datetime= BashOperator(
task_id = 'get_datetime',
bash_command='date',
do_xcom_push=True
)

相关内容

  • 没有找到相关文章

最新更新