Airflow Python运算符-从参数的xcom pull中获取值



我有一个SQL文件,如下所示。

select * from table where data > {{ params.maxdt }}

我正在调用python运算符中的一个函数。所以我需要在调用python操作符时传递maxdt值。

class SQLTemplatedPythonOperator(PythonOperator):
    template_ext = ('.sql',)
table_list = [public.t1]
for table_name in table_list:
    pull = SQLTemplatedPythonOperator(
    task_id='export_{}'.format(table_name),dag=dag,
    templates_dict={'query': 'public.t1.sql'},
    params = {'table_name': table_name, 'maxdt': {{ti.xcom_pull(task_ids='push_result_{}')}}.format(table_name)},
    op_kwargs={'tablename':table_name},
    python_callable=pgexport,
    provide_context=True,
    )

有一个名为push_result_{}.format(table_name)的任务进行一些处理并将数据值推送到xcom。现在,我需要得到这个值,并将其传递到我的pull任务中。因此,我的SQL模板查询将获得该值并将其传递给pgexport函数。

pgexport-它将使用postgrestogcs运算符来推送模板化SQL查询的结果。

不幸的是,我使用的语法不起作用。有人能帮我修一下吗?

看起来有几个语法问题。

第一个是字符串周围缺少引号,所以{{ti.xcom_pull(task_ids='push_result_{}')}}.format(table_name)应该变成"{{ti.xcom_pull(task_ids='push_result_{}')}}".format(table_name)

现在,因为您使用的是python格式函数,所以您必须在格式化程序使用花括号时对其进行转义,所以您需要将其更改为:

"{{{{ ti.xcom_pull(task_ids='push_result_{}') }}}}".format(table_name)

正如您所看到的,花括号正在对其自身进行转义,即表"格式化后的结果;TABLENAME";将是以下字符串:{{ ti.xcom_pull(task_ids='push_result_TABLENAME') }},它现在是一个可以在执行时替换的气流宏。

仅供参考-您还可以决定使用旧的python格式来避免方括号转义,比如:"{{ ti.xcom_pull(task_ids='push_result_%s') }}" % "TABLENAME"

最新更新