Psycopg2:如何使用Psycopg2和python插入和更新冲突



我正在使用psycopg2向postgres数据库插入命令,当发生冲突时,我只想更新其他列的值。

以下是查询:

insert_sql = '''
INSERT INTO tablename (col1, col2, col3,col4)
VALUES (%s, %s, %s, %s) (val1,val2,val3,val4)
ON CONFLICT (col1)
DO UPDATE SET
(col2, col3, col4)
= (val2, val3, val4) ; '''
cur.excecute(insert_sql)

我想找出我做错了什么?我使用的变量val1,val2,val3不是实际值。

引用psycopg2的文档:

警告从不,从不,从不使用Python字符串串联(+(或字符串参数插值(%(将变量传递到SQL查询字符串。即使在枪口下也不行

现在,对于追加销售操作,您可以这样做:

insert_sql = '''
INSERT INTO tablename (col1, col2, col3, col4)
VALUES (%s, %s, %s, %s)
ON CONFLICT (col1) DO UPDATE SET
(col2, col3, col4) = (EXCLUDED.col2, EXCLUDED.col3, EXCLUDED.col4);
'''
cur.execute(insert_sql, (val1, val2, val3, val4))

请注意,查询的参数作为元组传递给execute语句(这确保psycopg2将负责将它们调整为SQL,同时保护您免受注入攻击(。

EXCLUDED位允许您重用这些值,而无需在数据参数中指定两次。

使用:

INSERT INTO members (member_id, customer_id, subscribed, customer_member_id, phone, cust_atts) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (customer_member_id) DO UPDATE SET (phone) = (EXCLUDED.phone);

我收到以下错误:

psycopg2.errors.FeatureNotSupported: source for a multiple-column UPDATE item must be a sub-SELECT or ROW() expression
LINE 1: ...ICT (customer_member_id) DO UPDATE SET (phone) = (EXCLUDED.p...

更改为:

INSERT INTO members (member_id, customer_id, subscribed, customer_member_id, phone, cust_atts) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (customer_member_id) DO UPDATE SET (phone) = ROW(EXCLUDED.phone);

解决了问题。

尝试:

INSERT INTO tablename (col1, col2, col3,col4)
VALUES (val1,val2,val3,val4)
ON CONFLICT (col1)
DO UPDATE SET
(col2, col3, col4)
= (val2, val3, val4) ; '''

我还没有看到任何人对此发表评论,但您可以使用psycopg2.extras.execute_values一次插入/更新多行数据,我认为这是许多插入/更新的预期解决方案。

YouTube上有一些教程说明了这一点,其中一个是如何使用psycopg2+Python连接到PSQL数据库

在视频中,他们使用pandas加载数据帧,并将CSV源的数据插入多个模式/表中。视频中的代码片段示例是

from psycopg2.extras import execute_values

sql_insert = """    
INSERT INTO {state}.weather_county(fips_code, county_name, temperature)
VALUES %s
ON CONFLICT (fips_code) DO UPDATE
SET
temperature=excluded.temperature,
updated_at=NOW()
;
"""

grouped = new_weather_data.groupby(by='state') ## new_weather_data is a dataframe
conn = create_rw_conn(secrets=secrets)

for state, df in grouped:
# select only the neccessary columns
df = df[['fips_code', 'county_name', 'temperature']]
print("[{}] upsert...".format(state))
# convert dataframe into list of lists for `execute_values`
data = [tuple(x) for x in df.values.tolist()]
cur = conn.cursor()
execute_values(cur, sql_insert.format(state=state), data)
conn.commit() # <- We MUST commit to reflect the inserted data
print("[{}] changes were commited...".format(state))
cur.close()

Jupyter笔记本是psycopg2 python教程/new-schemas-tables-insert.ipynb

这是一个函数,它接受df、表的schemaname、表的名称、要在冲突名称中用作冲突的列,以及sqlalchemy的create_engine创建的引擎。它会根据冲突列更新表。这是@Ionut-Ticus解的扩展解。不要同时使用pandas.to_sql((。pandas.to_sql((会破坏主键设置。在这种情况下,需要通过ALTER查询设置主键,这是下面函数的建议。主键不一定会被panda破坏,可能没有设置它。在这种情况下会出现错误:引用表的给定键没有唯一的约束匹配吗?函数将建议您执行以下操作。

engine.execute('ALTER TABLE {schemaname}.{tablename} ADD PRIMARY KEY ({conflictcolumn});

功能:

def update_query(df,schemaname,tablename,conflictcolumn,engine ):
"""
This function takes dataframe as df, name of schema as schemaname,name of the table to append/add/insert as tablename, 
and column name that only  other elements of rows will be changed if it's existed as conflictname,
database engine as engine.
Example to engine : engine_portfolio_pg = create_engine('postgresql://pythonuser:vmqJRZ#dPW24d@145.239.121.143/cetrm_portfolio')
Example to schemaname,tablename : weatherofcities.sanfrancisco , schemaname = weatherofcities, tablename = sanfrancisco.
"""

excluded = ""
columns = df.columns.tolist()
deleteprimary = columns.copy()
deleteprimary.remove(conflictcolumn)
excluded = ""
replacestring = '%s,'*len(df.columns.tolist())
replacestring = replacestring[:-1]
for column in deleteprimary:
excluded += "EXCLUDED.{}".format(column)+","
excluded = excluded[:-1]
columns = ','.join(columns)
deleteprimary  = ','.join(deleteprimary)
insert_sql = """ INSERT INTO {schemaname}.{tablename} ({allcolumns})
VALUES ({replacestring})
ON CONFLICT ({conflictcolumn}) DO UPDATE SET
({deleteprimary}) = ({excluded})""".format( tablename = tablename, schemaname=schemaname,allcolumns = columns, replacestring= replacestring,
conflictcolumn= conflictcolumn,deleteprimary = deleteprimary,  excluded=excluded  )

conn = engine.raw_connection()
conn.autocommit = True
#conn = engine.connect()
cursor = conn.cursor()
i = 0
print("------------------------"*5)
print("If below error happens:")
print("there is no unique constraint matching given keys for referenced table?")    
print("Primary key is not set,you can execute:")
print("engine.execute('ALTER TABLE {}.{} ADD PRIMARY KEY ({});')".format(schemaname,tablename,conflictcolumn))
print("------------------------"*5)
for index, row in df.iterrows():
cursor.execute(insert_sql, tuple(row.values))
conn.commit() 
if i == 0:
print("Order of Columns in Operated SQL Query for Rows")
columns = df.columns.tolist()
print(insert_sql%tuple(columns))
print("----")
print("Example of Operated SQL Query for Rows")
print(insert_sql%tuple(row.values))
print("---")

i += 1 
conn.close()

最新更新