我有一个SQL文件,如下所示。
select * from table where data > {{ params.maxdt }}
我正在调用python运算符中的一个函数。所以我需要在调用python操作符时传递maxdt值。
class SQLTemplatedPythonOperator(PythonOperator):
template_ext = ('.sql',)
table_list = [public.t1]
for table_name in table_list:
pull = SQLTemplatedPythonOperator(
task_id='export_{}'.format(table_name),dag=dag,
templates_dict={'query': 'public.t1.sql'},
params = {'table_name': table_name, 'maxdt': {{ti.xcom_pull(task_ids='push_result_{}')}}.format(table_name)},
op_kwargs={'tablename':table_name},
python_callable=pgexport,
provide_context=True,
)
有一个名为push_result_{}.format(table_name)
的任务进行一些处理并将数据值推送到xcom。现在,我需要得到这个值,并将其传递到我的pull任务中。因此,我的SQL模板查询将获得该值并将其传递给pgexport
函数。
pgexport-它将使用postgrestogcs
运算符来推送模板化SQL查询的结果。
不幸的是,我使用的语法不起作用。有人能帮我修一下吗?
看起来有几个语法问题。
第一个是字符串周围缺少引号,所以{{ti.xcom_pull(task_ids='push_result_{}')}}.format(table_name)
应该变成"{{ti.xcom_pull(task_ids='push_result_{}')}}".format(table_name)
。
现在,因为您使用的是python格式函数,所以您必须在格式化程序使用花括号时对其进行转义,所以您需要将其更改为:
"{{{{ ti.xcom_pull(task_ids='push_result_{}') }}}}".format(table_name)
正如您所看到的,花括号正在对其自身进行转义,即表"格式化后的结果;TABLENAME";将是以下字符串:{{ ti.xcom_pull(task_ids='push_result_TABLENAME') }}
,它现在是一个可以在执行时替换的气流宏。
仅供参考-您还可以决定使用旧的python格式来避免方括号转义,比如:"{{ ti.xcom_pull(task_ids='push_result_%s') }}" % "TABLENAME"
。