气流2.3 -使用操作符的动态任务映射



我已经有了一些代码的当前实现,它工作得很好,但每天只执行一次检查,因为我不能将多个结果提供给下游任务。新的气流2.3.0动态任务映射似乎允许一组任务/操作符与前一个任务的输出列表或字典一起运行- https://airflow.apache.org/docs/apache-airflow/stable/concepts/dynamic-task-mapping.html。

我尝试了多种不同的方法来利用这一点,但没有成功,下面是我目前的工作实现。注意,它获取1个结果:

default_args = {
"owner": "test",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
}
with DAG(
"test",
default_args=default_args,
description="Test",
template_searchpath=["/airflow_home/dags/"],
start_date=datetime(2022, 5, 3),
catchup=False,
tags=["test"],
render_template_as_native_obj=True,
) as dag:
operator_a = CustomMySqlFetchDictionaryOperator(
task_id="operator_a",
mysql_conn_id="default_mysql",
database_schema="test",
sql="abc.sql",
fetch_size=1,
params={"param_1_value":"param_1_key",},
)
operator_b = CustomMySQLIntervalCheckOperator(
task_id="operator_b",
mysql_conn_id="default_mysql",
database_schema="test",
sql="abc.sql",
days_back=1,
date_filter_column="date",
ratio_formula="max_over_min",
metrics_thresholds={"AVG(total_size)": 1.05},
ignore_zero=False,
check_date="{{ ti.xcom_pull(task_ids='operator_a')[0]['date'] }}",
)
operator_a >> operator_b

第一个操作符返回的示例如下

[{'remote_path': 'data.txt', 
'date': datetime.date(2022, 5, 2), 
'last_modified': datetime.datetime(2022, 5, 3, 2, 47), 
'total_size': 3291050}]

第二个操作符中的check_date参数使用列表中的值0,因为它只能返回1个结果(由于fetch_size=1)。当dag参数render_template_as_native_obj被设置为True时,字典将为第二个操作符正确呈现,并且它们都成功。

然而,当调整它以使用如下所示的新的动态任务映射时,当fetch_size增加到>1并且列表现在包含多个字典时,我无法让第二个操作符使用这些值,我希望将这些字典映射到同一个操作符的多个任务。我尝试的例子:

operator_b = CustomMySQLIntervalCheckOperator.partial(
task_id="operator_b",
mysql_conn_id="default_mysql",
database_schema="test",
sql="abc.sql",
days_back=1,
date_filter_column="date",
ratio_formula="max_over_min",
metrics_thresholds={"AVG(total_size)": 1.05},
ignore_zero=False,
).expand(
check_date="{{ ti.xcom_pull(task_ids='operator_a')['date'] }}",
)

我也试过通过operator_a.output使用XComArg在check_date参数中使用符号(operator_a.output['date']),但是在尝试此操作时得到错误;XComArg只支持str查找,接收到

感谢任何帮助或指导我可以在这里!

谢谢。

expand()接收到一个list来操作每个元素的映射,但是当您调用expend(check_date = " ")时,您传递的是一个string,对吗?

最新更新