从BranchPythonOperator调用具有动态子任务id的任务组



我想从BranchPythonOperator调用一个具有动态子任务id的TaskGroup。

这是我的DAG流:

branch_dag

我的情况是,我想检查一个表在BigQuery中是否存在。

  • 如果存在:不执行任何操作并结束DAG

  • 如果不存在:从Postgres摄取数据到Google Cloud Storage

我知道从BranchPythonOperator调用任务组是通过调用任务id,格式如下:
group_task_id.task_id

问题是,我的任务组的子任务id是动态的,取决于我循环TaskGroup的次数。所以sub_task将是:

parent_task_id.sub_task_1
parent_task_id.sub_task_2
parent_task_id.sub_task_3
...
parent_task_id.sub_task_x

这是我拥有的DAG的以下代码:

import airflow
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from airflow.utils.task_group import TaskGroup
from google.cloud.exceptions import NotFound
from airflow import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy import DummyOperator
from google.cloud import bigquery
default_args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
with DAG(dag_id='branch_dag', default_args=default_args, schedule_interval=None) as dag:
def create_task_group(worker=1):
var  = dict()
with TaskGroup(group_id='parent_task_id') as tg1:
for i in range(worker):
var[f'sub_task_{i}'] = PostgresToGCSOperator(
task_id = f'sub_task_{i}',
postgres_conn_id = 'some_postgres_conn_id',
sql = 'test.sql',
bucket = 'test_bucket',
filename = 'test_file.json',
export_format = 'json',
gzip = True,
params = {
'worker': worker
}
)
return tg1

def is_exists_table():
client = bigquery.Client()
try:
table_name = client.get_table('dataset_id.some_table')
if table_name:
return 'task_end'
except NotFound as error:       
return 'parent_task_id'
task_start = DummyOperator(
task_id = 'start'
)
task_branch_table = BranchPythonOperator(
task_id ='check_table_exists_in_bigquery',
python_callable = is_exists_table
)
task_pg_to_gcs_init = create_task_group(worker=3)
task_end = DummyOperator(
task_id = 'end',
trigger_rule = 'all_done'
)    
task_start >> task_branch_table >> task_end
task_start >> task_branch_table >> task_pg_to_gcs_init >> task_end

当我运行dag时,它返回

**airflow.exceptions.TaskNotFound: Task parent_task_id not found**

但这是预期的,我不知道的是如何在is_exists_table函数上迭代parent_task_id.sub_task_x。或者有什么变通办法吗?

如果需要的话,这是test.sql文件


SELECT 
id,
name,
country
FROM some_table
WHERE 1=1
AND ABS(MOD(hashtext(id::TEXT), 3)) = {{params.worker}};
-- returns 1M+ rows

我已经把这个问题看作参考问题,但我认为我的情况更具体。

在设计数据管道时,您可能会遇到需要比"任务A"更复杂的任务流的用例;任务B>任务C !"例如,您可能有一个用例,您需要根据上游任务的结果在执行多个任务之间做出决定。或者您可能遇到这样的情况,即管道的一部分只能在某些外部条件下运行。幸运的是,气流有多种选择来构建条件逻辑和/或分支到您的dag。

我找到了一个肮脏的方法。

我所做的是使用DummyOperator创建一个名为task_pass的额外任务。

task_pass = DummyOperator(
task_id = 'pass_to_task_group'
)

所以DAG流现在看起来像这样:

task_start >> task_branch_table >> task_end
task_start >> task_branch_table >> task_pass >> task_pg_to_gcs_init >> task_end

还有一个我在上面的代码中犯的错误,注意我设置的参数是worker。这是错误的,因为worker是常量,而我需要迭代的是I变量。所以我把它改成:

params: worker

:

params: i

最新更新