如何使用自动化或命令行工具管理AWS Redshift中的存储过程?



我们已经在AWS上使用Redshift构建了许多存储过程。我们需要下载并上传这些存储过程,以便它们可以保存在GITHUB中,作为跟踪更改的一种手段。

这些过程最终需要成为cloudformation模板的一部分,以便基础设施也能得到维护。

理想情况下,这可以使用AWS CLI完成,但似乎没有这样的命令。

如何在自动化/CICD环境中管理AWS RedShift存储过程?

我有一部分可行的解决方案。

import json
import psycopg2
import os

def run_sql_commands(input_command, database="mydatabasename", host_port=1234 ,host_url="datawarehouse.redshift.amazonaws.com"):
"""
:param input_command: sql string to execute
:param database: which database to run the query
:return:
"""
results = None
# db_user and db_pass will need to be set as environment variables
db_user = os.environ["db_user"]
db_pass = os.environ["db_pass"]
db_host = host_url
db_port = host_port
db_name = database
try:
conn = psycopg2.connect(
"dbname={} port={} user={} host={} password={}".format(db_name, db_port, db_user, db_host, db_pass))
cursor = conn.cursor()
cursor.execute(input_command)
results = cursor.fetchall()
except Exception as e:
return None
return results

def get_arg_type(oid, database):
sql = f"SELECT typname FROM pg_catalog.pg_type WHERE oid={oid}"
r = run_sql_commands(sql, database)
return r[0][0]

def download_all_procedures(database, file_location="./local-code-store"):
get_all_stored_procedure_sql = """
SELECT
n.nspname,
b.usename,
p.proname,
p.proargnames,
p.proargtypes,
p.prosrc
FROM
pg_catalog.pg_namespace n
JOIN pg_catalog.pg_proc p ON
pronamespace = n.oid
JOIN pg_user b ON
b.usesysid = p.proowner
WHERE
nspname NOT IN ('information_schema', 'pg_catalog');
"""
r = run_sql_commands(get_all_stored_procedure_sql, database)
counted_items=[]
for item in r:
table = item[0]
author = item[1]
procedure_name = item[2]
procedure_arguments = item[3]
procedure_argtypes = item[4]
procedure = item[5]
t_list = []
for this_oid in procedure_argtypes.split():
t = get_arg_type(this_oid, database="mydatabasename")
t_list.append(t)
meta_data = {'table': table,
'author': author,
'arguments': procedure_arguments,
'argument_types': t_list}
filename = f'{file_location}/{database}/Schemas/{table}/{procedure_name}.sql'
os.makedirs(os.path.dirname(filename), exist_ok=True)
f = open(filename, 'w')
count = f.write(procedure.replace('rn', 'n'))
f.close()
filename = f'{file_location}/{database}/Schemas/{table}/{procedure_name}.json'
os.makedirs(os.path.dirname(filename), exist_ok=True)
f = open(filename, 'w')
counted_items.append(f.write(json.dumps(meta_data)))
f.close()
return counted_items

if __name__ == "__main__":
print("Starting...")
c = download_all_procedures("mydatabase")
print("... finished")

最新更新