如何使用可调用python中的postgres连接id连接到postgres



我正在使用Airflow的python运算符来调用python函数。ERROR出现在try/except块中。

def python_callable_new():
print("Inside python callable ...")
import psycopg2
try:
print("attempting database connection from python method.. ")
conn = psycopg2.connect('postgres_defined_connection')
print("success. ")
except Exception as error:
print("failed: ")
print (error)
return 'End of callable. '

with dag:
start_task  = DummyOperator(  task_id= "start" )
stop_task   = DummyOperator(  task_id= "stop"  )

do_python_task = PythonOperator(
task_id = 'do-py-operation',
python_callable= python_callable_new,
)
extract_source_data = PostgresOperator(
task_id='extract-cb-source-data',
postgres_conn_id='postgres_defined_connection',
sql='./sql_scripts/extract_csv_data.sql'
)
# csv_to_postgres
start_task >> do_python_task >> extract_source_data >> stop_task

基本上,我的问题是

  • 如何使用'postgres_defined_connection'连接到python函数内部的postgres
  • 当我使用PostgresOperator时,它连接得很好,这可以在extract_source_data任务中看到,但我需要在可调用函数中使用它
  • 出现的错误是无效dsn:missing"在";postgres_defined_connection";在连接信息字符串中

(仅供参考,我将postgres_defined_connection存储在一个单独的connections.py中,该connections.py使用sqlalchemy引擎和PostgresHook(

psycopg2.connect需要连接参数。如果将连接参数格式化为用空格分隔的键/值对,则可以向它们传递单个字符串。这就是为什么它会给你错误消息missing"=&";。

有关详细信息,请参阅psycopg文档。


要连接到Airflow中的Postgres数据库,只要创建了连接,就可以利用PostgresHook。

from airflow.hooks.postgres_hook import PostgresHook

def execute_query_with_conn_obj(query):
hook = PostgresHook(postgres_conn_id='my_connection')
conn = hook.get_conn()
cur = conn.cursor()
cur.execute(query)
def execute_query_with_hook(query):
hook = PostgresHook(postgres_conn_id='my_connection')
hook.run(sql=query)

您也可以使用纯Python代码来完成此操作。

def execute_query_with_psycopg(query):
conn_args = dict(
host='myhost';,
user='admin',
password='password',
dbname='my_schema',
port=5432)
conn = psycopg2.connect(**conn_args)
cur = conn.cursor()
cur.execute(query)
def execute_query_with_psycopg_string(query):
conn = psycopg2.connect("dbname=test user=postgres password=secret")
cur = conn.cursor()
cur.execute(query)

相关内容

  • 没有找到相关文章

最新更新