创建与psycopg3的异步连接



我正在尝试使用psycopg3创建异步连接。我使用psycopg2没有异步,需要移动到异步数据库函数。文档没有给出太多信息。

这就是我在psycopg2中使用的。

con = psycopg2.connect(host="HOSTNAME", port="PORT", database=("DATABASE", user="USER", password="PASSWORD")
cursor = con.cursor()

然后当我需要运行查询时,我只需使用

cursor.execute(query, params)
cursor.fetchall() # or con.commit() depending on insert or select statement.

现在,我正在移动到异步函数,我已经尝试了这个

con = await psycopg.AsyncConnection.connect(host="HOSTNAME", port="PORT", database="DATABASE", user="USER", password="PASSWORD")
cursor = await con.cursor()

但是我得到一个错误,我不能在函数外使用await。

文档告诉我这样做

async with await psycopg.AsyncConnection.connect() as aconn:
async with aconn.cursor() as cur:
await cur.execute(...)

所以我需要在我想要读取或写入记录的每个函数中都写这个吗?

目前在我的代码中使用psycopg2的几个例子

async def check_guild(guild_id):
cursor.execute("SELECT guild_id, guild_name, su_id FROM guild WHERE guild_id = %s", [guild_id])
guild = cursor.fetchone()
return guild
async def config_raffle(guild_id, channel_id, channel_name, channel_cat_id, token, token_id, default_address, su_id, fee):
try:
cursor.execute("""INSERT INTO raffle_config (guild_id, channel_id, channel_name, channel_cat_id, token, default_token, default_address, su_id, fee) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (guild_id) DO UPDATE SET channel_id = EXCLUDED.channel_id, channel_name = EXCLUDED.channel_name, channel_cat_id = EXCLUDED.channel_cat_id, token = EXCLUDED.token,
default_token = EXCLUDED.default_token, default_address = EXCLUDED.default_address, su_id = EXCLUDED.su_id, fee = EXCLUDED.fee""",
(guild_id, channel_id, channel_name, channel_cat_id, token, token_id, default_address, su_id, fee))
con.commit()
except:
logging.exception("Exception", exc_info=True)
con.rollback()
print("Error: 25")
return True

所以我想也许我更好的选择是使用AsyncConnectionPool。我有一个db.py文件设置如下:

import psycopg_pool
import os
import dotenv
dotenv.load_dotenv()
conninfo = f'host={os.getenv("HOSTNAME")} port={os.getenv("PORT")} dbname={os.getenv("DATABASE")} user={os.getenv("USER")} password={os.getenv("PASSWORD")}'
pool = psycopg_pool.AsyncConnectionPool(conninfo=conninfo, open=False)

async def open_pool():
await pool.open()

在程序运行on_ready函数时打开池。我以这种方式创建了新表,但是当我试图检索记录时,我得到了这个错误。

discord.ext.commands.errors.CommandInvokeError: Command raised an exception: AttributeError: 'AsyncConnection' object has no attribute 'fetchone'

最后是这样排序的:

import psycopg_pool
import os
import dotenv
dotenv.load_dotenv()
conninfo = f'host={os.getenv("HOSTNAME")} port={os.getenv("PORT")} dbname={os.getenv("DATABASE")} user={os.getenv("USER")} password={os.getenv("PASSWORD")}'
pool = psycopg_pool.AsyncConnectionPool(conninfo=conninfo, open=False)

async def open_pool():
await pool.open()
await pool.wait()
print("Connection Pool Opened")
async def select_fetchall(query, args):
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute(query, args)
results = await cursor.fetchall()
return results

async def write(query, args):
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute(query, args)
if 'RETURNING' in query:
results = await cursor.fetchone()
return results
else:
return

然后,当我需要读取或写入数据库并传递查询和参数时,我只调用函数。

最新更新