Snowflake-在Airflow中执行查询的输出



我有一个创建外键的查询(foreign_keys.sql(,输出是用于添加FK的ALTER语句行,但我如何执行这些语句?

示例行:

ALTER TABLE "EIV"."RTR"."LINEITEMS" 
ADD FOREIGN KEY (ITEM_ID) REFERENCES "EIV"."RTR"."ID_LINEITEMS" (ID);

下面是我将如何在Airflow中运行此语句,但我该如何执行这些语句?

snp_create_foreign_keys = SnowflakeQueryOperator(
task_id='create_foreign_keys',
sql='queries/foreign_keys.sql',
params={
'schema': 'qtr'
},
retries=0)

这就是我们的SnowflakeQueryOperator的样子:

class SnowflakeQueryOperator(BaseOperator):
template_fields = ['sql', 'params']
template_ext = ['.sql']
@apply_defaults
def __init__(self,
sql,
params=None,
warehouse=Variable.get('default_snowflake_warehouse'),
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.sql = sql
self.params = params
self.warehouse = warehouse
def execute(self, context):
sf_hook = SnowflakeHook(warehouse=self.warehouse)
self.log.info(f'Running query:')
sf_hook.execute_query(self.sql)

据我所知,Airflow中的SnowflakeOperator不会返回选择查询的结果,它应该只用于在Snowflake上执行查询(就像大多数数据库运算符一样(,并且要么失败,要么成功。

要做到这一点,您需要编写自己的运算符。

最新更新