如何使用SimpleHttpsOperator读取上一条消息的XCom,然后决定在Airflow中执行任务2



如何使用SimpleHttpsOperator读取上一条消息的XCom,然后决定在Airflow中执行任务2。

假设我有 3 个 SimpleHttpsOperator 任务,所有任务都返回一条 XCom 消息,在 XCom 值中,它根据结果返回成功或失败。

所以在执行 t2 之前,我想检查 t1 是否成功。我所有的任务都使用SimpleHttpsOperator

T1>> T2>> T3

下面是我的代码片段:

t1 = SimpleHttpOperator(
                   task_id='t1',
                   http_conn_id='http_temp',
                   endpoint='update_data',
                   method='POST',
                   headers={"Content-Type":"application/json"},
                   xcom_push=True,
                   log_response=True,
                   dag=dag,
)

t2 = SimpleHttpOperator(
                                   task_id='t2',
                                   http_conn_id='http_temp',
                                   endpoint='update_data',
                                   method='POST',
                                   headers={"Content-Type":"application/json"},
                                   # response_check=lambda response: True if len(response.json()) == 0 else False,
                                   xcom_push=True,
                                   log_response=True,
                                   dag=dag,

你必须使用 BranchPythonOperator。 下面的依赖链中的check_t1_status&check_t2_status将使用BranchPythonOperator来检查使用xcom的上一个任务的输出。并根据输出,执行下一个任务,或者如果上一个任务失败,则运行一个假尾。

t1 >> check_t1_status >> t2 >> check_t2_status >> t3
check_t1_status >> t1_fail
check_t2_status >> t2_fail

相关内容

最新更新