继这个问题之后,当我尝试从具有多个分区的dask.dataframe创建一个postgresql表时,我收到以下错误:
IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "pg_type_typname_nsp_index"
DETAIL: Key (typname, typnamespace)=(test1, 2200) already exists.
[SQL: 'nCREATE TABLE test1 (nt"A" BIGINT, nt"B" BIGINT, nt"C" BIGINT, nt"D" BIGINT, nt"E" BIGINT, nt"F" BIGINT, nt"G" BIGINT, nt"H" BIGINT, nt"I" BIGINT, nt"J" BIGINT, ntidx BIGINTn)nn']
您可以使用以下代码重新创建错误:
import numpy as np
import dask.dataframe as dd
import dask
import pandas as pd
import sqlalchemy_utils as sqla_utils
import sqlalchemy as sqla
DATABASE_CONFIG = {
'driver': '',
'host': '',
'user': '',
'password': '',
'port': 5432,
}
DBNAME = 'dask'
url = '{driver}://{user}:{password}@{host}:{port}/'.format(
**DATABASE_CONFIG)
db_url = url.rstrip('/') + '/' + DBNAME
# create db if non-existent
if not sqla_utils.database_exists(db_url):
print('Creating database '{}''.format(DBNAME))
sqla_utils.create_database(db_url)
conn = sqla.create_engine(db_url)
# create pandas df with random numbers
df = pd.DataFrame(np.random.randint(0,40,size=(100, 10)), columns=list('ABCDEFGHIJ'))
# add index so that it can be used as primary key later on
df['idx'] = df.index
# create dask df
ddf = dd.from_pandas(df, npartitions=4)
# Write to psql
dto_sql = dask.delayed(pd.DataFrame.to_sql)
out = [dto_sql(d, 'test', db_url, if_exists='append', index=False, index_label='idx')
for d in ddf.to_delayed()]
dask.compute(*out)
如果 npartitions 设置为 1,则代码不会产生错误。所以我猜这与 postgres 无法处理写入同一 sql 表的并行请求有关......?我该如何解决这个问题?
我正在阅读这篇文章。当您使用并行处理创建/更新同一表时,此错误似乎会出现。我知道这取决于这个(正如谷歌小组讨论中所解释的那样)。
所以我认为这取决于PostgreSQL
本身,而不是连接驱动程序或用于多处理的模块。
好吧,实际上,我发现解决这个问题的唯一方法是创建足够大的块,以返回比计算本身慢的写入过程。对于较大的块,此错误不会上升。
在PostgreSQL中,这对我有帮助。
set enable_parallel_hash=off;
打开后
set enable_parallel_hash=on;
我在 Heroku 的 PostgreSQL 上使用 ponyORM 时遇到了同样的错误。我通过锁定线程直到它执行数据库操作来解决它。就我而言:
lock = threading.Lock()
with lock:
PonyOrmEntity(name='my_name', description='description')
PonyOrmEntity.get(lambda u: u.name == 'another_name')