使用motor io测试MongoDB连接的正确方法是什么?



我有一个简单的FastAPI Web应用程序,我希望能够在启动时检查数据库连接(如果失败,则重试连接(

我有以下代码,但感觉不对

# main.py
import uvicorn
from backend.app import app

if __name__ == "__main__":
uvicorn.run(app, port=8001)
# app.py
# ... omitted for brevity
from backend.database import notes, tags
# ... omitted for brevity
# database.py
from motor.motor_asyncio import AsyncIOMotorClient
from asyncio import get_event_loop

client = AsyncIOMotorClient("localhost", 27027)
loop = get_event_loop()
data = loop.run_until_complete(client.server_info())
db = client.notes_db
notes = db.notes
tags = db.tags

如果没有get_event_loop()和随后的loop.run_until_complete()调用,它将不会测试数据库连接,直到您真正尝试访问/写入它

我的目标是能够停止启动过程,直到它能够成功连接到数据库,有什么干净的方法可以用Python和motor.io做到这一点吗(https://motor.readthedocs.io/,抱歉没有标签(?

我想FastAPI中的startup事件就是这里的交易。我补充说,这个存储库是一个很好的例子,这个线程甚至可以为您提供更多信息。您可以在startup事件中执行测试。这意味着在成功执行startup事件之前,应用程序不会启动。

相关内容

最新更新