克隆Postgres schema,不带pg_dump



我需要一种方法来系统地克隆Postgres模式到另一个数据库,没有数据,通过SQL查询。我不能用pg_dump(因为它很复杂)。解决方案类似于:

  1. 连接数据库进行克隆。
  2. <
  3. 得到模式/gh>
  4. 连接到新数据库(我们要克隆到的地方)
  5. 执行模式脚本

是否有可能通过sql查询或类似的方式获得pg_dump用于转储模式的查询?例如运行pg_dump通过sql编辑器?

更新:

克隆模式是指特定的schema(例如public)及其内容。

这段Python 3代码将使用psycopg包将源数据库的模式和表复制到目标数据库。

如果希望复制可空项、键或索引,可能需要进一步细化由这段代码生成的CREATE TABLE查询。您可以轻松地将这些CREATE TABLE脚本保存到文件中,以便单独使用或执行。

底部还有一个小块,如果您希望这样做,它将复制表的数据。

import psycopg

source = {
'host': 'source_host', 
'port': '5432', 
'dbname': 'source_database',
'user': 'postgres', 
'password': 'postgres'
}
target = {
'host': 'target_host', 
'port': '5432', 
'dbname': 'target_database',
'user': 'postgres', 
'password': 'postgres'
}
sourcedb = psycopg.connect(**source)
targetdb = psycopg.connect(**target)
with sourcedb:
with targetdb:
targetdb.autocommit = False
with sourcedb.cursor() as sourceCursor:
with targetdb.cursor() as targetCursor:
# get list of schemas in database
query = f"""SELECT      schema_name 
FROM        information_schema.schemata
WHERE           schema_name not in ('pg_catalog',
'pg_toast', 
'information_schema')
ORDER BY    schema_name ASC;"""
sourceCursor.execute(query)
schemas = [tupleRow[0] for tupleRow in sourceCursor.fetchall()]
for schema in schemas:
# get list of tables in schema
query = f"""SELECT      table_name 
FROM        information_schema.tables 
WHERE           table_schema = '{schema}' 
AND table_type = 'BASE TABLE'
ORDER BY    table_name ASC;"""
sourceCursor.execute(query)
schemaTables = [tupleRow[0] for tupleRow in sourceCursor.fetchall()]
# create schema on target_database if not exists
query = f'CREATE SCHEMA IF NOT EXISTS "{schema}";'
targetCursor.execute(query)
for table in schemaTables:
# generate a 'CREATE TABLE' query from source 
# table's column names and data types
query = f"""SELECT       column_name
,data_type
FROM        INFORMATION_SCHEMA.COLUMNS 
WHERE           table_name = '{table}' 
AND table_schema = '{schema}'
ORDER BY    ordinal_position ASC;"""
sourceCursor.execute(query)
columns = ['"{}" {}n'.format(i[0], i[1]) for i in sourceCursor.fetchall()]
columnsString = ' ,'.join(columns)
createTableQuery = f'CREATE TABLE IF NOT EXISTS "{schema}"."{table}"n( {columnsString});nn'
# run the CREATE TABLE on the target_database
targetCursor.execute(createTableQuery)
# uncomment this block to copy the data inside the tables
# from source_database to target_database

# with sourceCursor.copy(f"COPY {schema}.{table} TO STDOUT;") as sourceCopy:
#     with targetCursor.copy(f"COPY {schema}.{table} FROM STDIN;") as targetWrite:
#         for data in sourceCopy:
#             targetWrite.write(data)
targetdb.commit()

最新更新