如何在FastAPI或用于MySQL的ARQ中正确使用SQLalchemy



在我的FastAPI项目中,我使用SQlalchemy将数据库连接到MySQL服务器,该项目由实际的API和使用arq编写的后端工作人员组成。两者共享相同的代码库、模型,因此也共享相同的数据库代码。

我基本上是这样创建数据库连接的:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import config as c
SQLALCHEMY_DATABASE_URL = "%s://%s:%s@%s:%d/%s" % (c.settings.db_type, c.settings.db_username, c.settings.db_password, c.settings.db_host, c.settings.db_port, c.settings.db_database)
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

这在大多数情况下都有效。然而,偶尔请求会失败,并显示错误消息

sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2006, "MySQL server has gone away (BrokenPipeError(32, 'Broken pipe'))")

然而,这种情况只发生在worker部分,还没有在FastAPI容器中看到,尽管两者都使用相同的代码。在arq任务中,每当我需要数据库连接时,我基本上都在做:

from core.database import get_db
# ...
db = get_db().__next__()

有什么想法可以绕过这个问题吗?谢谢

在我的案例中,我在某些情况下忘记关闭DB连接,因此没有将连接返回到默认池。日志实际上告诉了我这件事。我不确定为什么会导致上面的BrokenPipe错误,但修复两个受影响的工人方法解决了问题。

最新更新