任务组内的气流分支操作员具有无效的任务id


def skip_update_job_pod_name(dag):
"""
:param dag: Airflow DAG
:return: Dummy operator to skip update pod name
"""
return DummyOperator(task_id="skip_update_job_pod_name", dag=dag)

def update_pod_name_branch_operator(dag: DAG, job_id: str):
"""branch operator to update pod name."""
return BranchPythonOperator(
dag=dag,
trigger_rule="all_done",
task_id="update_pod_name",
python_callable=update_pod_name_func,
op_kwargs={"job_id": job_id},
)

def update_pod_name_func(job_id: Optional[str]) -> str:
"""function for update pod name."""
return ["update_job_pod_name"] if job_id else ["skip_update_pod_name"]

def update_job_pod_name(dag: DAG, job_id: str, process_name: str) -> MySqlOperator:
"""
:param dag: Airflow DAG
:param job_id: Airflow Job ID
:param process_name: name of the current running process
:return: MySqlOperator to update Airflow job ID
"""
return MySqlOperator(
task_id="update_job_pod_name",
mysql_conn_id="semantic-search-airflow-sdk",
autocommit=True,
sql=[
f"""
INSERT INTO airflow.Pod (job_id, pod_name, task_name)
SELECT * FROM (SELECT '{job_id}', '{xcom_pull("pod_name")}', '{process_name}') AS temp
WHERE NOT EXISTS (
SELECT pod_name FROM airflow.Pod WHERE pod_name = '{{{{ ti.xcom_pull(key="pod_name") }}}}'
) LIMIT 1;
"""
],
task_concurrency=1,
dag=dag,
trigger_rule="all_done",
)

我把分支操作符在一个任务组,但我得到了这个错误分支可调用对象必须返回有效的task_id。发现无效任务:{invalid_task_ids}"airflow.exceptions.AirflowException:分支可调用对象必须返回有效的task_id。发现无效任务:{'update_job_pod_name'}是什么导致的呢?

原因是组内的任务按照TaskGroup的约定获得task_id。

例如,如果我们将组命名为"tg1"task_id = "update_pod_name"那么该任务的最终名称为tg1.update_pod_name

最好的解决方法是使用获得操作符赋值的变量名

def update_pod_name_func(job_id: Optional[str]) -> str:
"""function for update pod name."""
return update_job_pod_name.task_id if job_id else skip_update_pod_name.task_id

最新更新