在使用SQLAlchemy将带有数据的表从一个postgres复制到另一个postress后,插入数据时出现自动递增序列



我正在使用SQLAlchemy和python将带有数据的表从一个postgres复制到另一个post格雷斯。

我查看了以下答案并创建了一个脚本

使用SQLAlchemy将一个数据库复制到另一个数据库

from sqlalchemy import create_engine, MetaData, event
from sqlalchemy.sql import sqltypes
import traceback

src_engine = create_engine("postgresql://user1:mypass@myip1:2025/mydb?options=-c search_path=public")
src_metadata = MetaData(bind=src_engine)

tgt_engine = create_engine("postgresql://user2:mypass@myip2:2025/newdb?options=-c search_path=public")
tgt_metadata = MetaData(bind=tgt_engine)

@event.listens_for(src_metadata, "column_reflect")
def genericize_datatypes(inspector, tablename, column_dict):
column_dict["type"] = column_dict["type"].as_generic(allow_nulltype=True)

tgt_conn = tgt_engine.connect()
tgt_metadata.reflect()

src_metadata.reflect()

for table in src_metadata.sorted_tables:
table.create(bind=tgt_engine)

# refresh metadata before you can copy data
tgt_metadata.clear()
tgt_metadata.reflect()
# # Copy all data from src to target
for table in tgt_metadata.sorted_tables:
src_table = src_metadata.tables[table.name]
stmt = table.insert()
temp_list = []
source_table_count = src_engine.connect().execute(f"select count(*) from {table.name}").fetchall()
current_row_count = 0
for index, row in enumerate(src_table.select().execute()):
temp_list.append(row._asdict())
if len(temp_list) == 2500:
stmt.execute(temp_list)
current_row_count += 2500
print(f"table = {table.name}, inserted {current_row_count} out of {source_table_count[0][0]}")
temp_list = []
if len(temp_list) > 0:
stmt.execute(temp_list)
current_row_count += len(temp_list)
print(f"table = {table.name}, inserted {current_row_count} out of {source_table_count[0][0]}")
print(f'source table "{table.name}": {source_table_count}')
print(f'target table "{table.name}": {tgt_engine.connect().execute(f"select count(*) from {table.name}").fetchall()}')

从源数据库复制到目标数据库的序列具有以下脚本

CREATE SEQUENCE public.my_table_request_id_seq
INCREMENT 1
START 1
MINVALUE 1
MAXVALUE 2147483647
CACHE 1;
ALTER SEQUENCE public.my_table_request_id_seq
OWNER TO my_user;

一切正常,表模式被复制,数据被复制,但每个表的自动递增列以1开头,而不是最后一个递增的值,因此我在插入新数据时收到错误duplicate key value violates unique constraint

我愿意解决这个问题。

提前感谢

这可能是因为您在不使用插入的情况下直接插入目标列。

例如,在源数据库中,您有一个如下表,列id是增量

+------+------------+
|  id  | name       |
+------+------------+
|  1   | test       |
|  2   | test2      |
+------+------------+

当它被复制到目标数据库时,它被直接作为插入

INSERT INTO TBL_NAME VALUES (1, 'test');
INSERT INTO TBL_NAME VALUES (2, 'test2');

而不使用该序列。修复方法可能是,一旦完成复制,就更改序列,并将其设置为源数据库中的最大id编号。

我已经对序列问题进行了工作更改。下面是完整的代码,它使用python和sqlalchemy 将带有数据的表从一个postgres数据库复制到另一个post格雷斯数据库

from sqlalchemy import create_engine, MetaData, event
from sqlalchemy.sql import sqltypes
import traceback

src_engine = create_engine("postgresql://user1:mypass@myip1:2025/mydb?options=-c search_path=public")
src_metadata = MetaData(bind=src_engine)

tgt_engine = create_engine("postgresql://user2:mypass@myip2:2025/newdb?options=-c search_path=public")
tgt_metadata = MetaData(bind=tgt_engine)

@event.listens_for(src_metadata, "column_reflect")
def genericize_datatypes(inspector, tablename, column_dict):
column_dict["type"] = column_dict["type"].as_generic(allow_nulltype=True)

tgt_metadata.reflect()

src_metadata.reflect()

for table in src_metadata.sorted_tables:
table.create(bind=tgt_engine)

# refresh metadata before you can copy data
tgt_metadata.clear()
tgt_metadata.reflect()
# Copy all data from src to target
for table in tgt_metadata.sorted_tables:
src_table = src_metadata.tables[table.name]
stmt = table.insert()
temp_list = []
with src_engine.connect() as src_conn:
source_table_count = src_conn.execute(f"select count(*) from {table.name}").fetchall()
current_row_count = 0
for index, row in enumerate(src_table.select().execute()):
temp_list.append(row._asdict())
if len(temp_list) == 2500:
stmt.execute(temp_list)
current_row_count += 2500
print(f"table = {table.name}, inserted {current_row_count} out of {source_table_count[0][0]}")
temp_list = []
if len(temp_list) > 0:
stmt.execute(temp_list)
current_row_count += len(temp_list)
print(f"table = {table.name}, inserted {current_row_count} out of {source_table_count[0][0]}")
###################### code for sequence restart: start ##############################
with tgt_engine.connect().execution_options(autocommit=True) as tgt_conn:
# getting sequence list
sequence_list_data = tgt_conn.execute(f"SELECT c.relname FROM pg_class c WHERE c.relkind = 'S';").fetchall()
sequence_list = [sequence[0] for sequence in sequence_list_data]
sequence_to_alter = ""
for sequence in sequence_list:
if sequence.startswith(table.name):
# getting sequence name for the given table
sequence_to_alter = sequence
break
if sequence_to_alter:
# getting column name from sequence name, usually sequence nameis: {tableName}_{columnName}_{seq}, example course_student_id_seq
new = sequence_to_alter.replace(f"{table.name}_", "")
sequence_column_name = new.replace("_seq", "")
# getting last generated sequence id for the table
last_id = tgt_conn.execute(f"select max({sequence_column_name}) from {table.name}").fetchall()[0][0]
# restarting the sequence id with last_id+1, so that sequence begins with last generated id + 1
alter_sequence_query = f"ALTER SEQUENCE {sequence_to_alter} RESTART WITH {last_id + 1};"
tgt_conn.execute(alter_sequence_query)
# setting the owner for the sequence to owner needed to be
tgt_conn.execute(f"ALTER SEQUENCE {sequence_to_alter} OWNER TO my_owner_name")
###################### code for sequence restart: end ##############################
print(f'source table "{table.name}": {source_table_count}')
with tgt_engine.connect() as tgt_conn:
print(f'target table "{table.name}": {tgt_conn.execute(f"select count(*) from {table.name}").fetchall()}')

如果有任何疑问或更正,请告诉我。

希望这能帮助到别人!

相关内容

  • 没有找到相关文章

最新更新