气流配置参数在PostgresHook抛出错误



脚本从气流conf {"flag":"NA","metric_name":"RED"}获取输入参数。我试图使用sql的where子句中的一个参数值。有没有人能检查一下我的传递方式是否有问题?

ERROR: "{">

def get_metrics(**kwargs):
varteam_flag=(kwargs['dag_run'].conf['flag'])
print("flag :",varteam_flag)
params={"param1": varteam_flag}
print(" params :",params['param1'])
conn_id = kwargs.get('conn_id')
pg_hook = PostgresHook(conn_id)
sql="select count(1) as cnt FROM odw.metrics where  flag = {{ params.param1 }} "
print(sql)
records = pg_hook.get_records(sql)
print("Records count is  :",str(records))
return records
getData_metrics=PythonOperator(task_id='getData_metrics', python_callable=get_metrics,op_kwargs {'conn_id':'veas'},provide_context=True, dag=dag)

你不能在python可调用对象中使用Jinja。当调用操作符时,正在计算Jinja,因此在您的情况下,Jinja引擎不适用于sql语句。

你需要做的只是使用一个简单的赋值:

sql=f"select count(1) as cnt FROM odw.metrics where  flag = {params.param1} "

我不太明白你为什么要做这么多作业我认为sql=f"select count(1) as cnt FROM odw.metrics where flag = {varteam_flag} "

应该提供相同的SQL语句。

如何在UI中传递命令行参数/配置参数并在PostgresHook/PostgresOperator中使用它[简单的python变量在这里使用f或.format()].

1. If you are using PostgresHook and calling PythonOperator for the module
def get_metrics(**kwargs):
varteam_flag=(kwargs['dag_run'].conf['flag'])
conn_id = kwargs.get('conn_id')
pg_hook = PostgresHook(conn_id)
connection = pg_hook.get_conn()
sql=f"select count(1) as cnt FROM odw.metrics where  flag = {varteam_flag} "
print(sql)
cursor = connection.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
cols = list(map(lambda x: x[0], cursor.description))
2. if you are using PostgresOperator [you can use jinja template]
gerics=PostgresOperator(task_id='gerics',
postgres_conn_id='veritas',
params={'param1': 10 },
sql=['truncate table test.dept1','insert into test.dept1 (select * from test.dept where rating={{ params.param1 }} )'],

请告诉我是否有其他的方式来写。

相关内容

最新更新