我有一个简单的FastAPI Web应用程序,我希望能够在启动时检查数据库连接(如果失败,则重试连接(
我有以下代码,但感觉不对
# main.py
import uvicorn
from backend.app import app
if __name__ == "__main__":
uvicorn.run(app, port=8001)
# app.py
# ... omitted for brevity
from backend.database import notes, tags
# ... omitted for brevity
# database.py
from motor.motor_asyncio import AsyncIOMotorClient
from asyncio import get_event_loop
client = AsyncIOMotorClient("localhost", 27027)
loop = get_event_loop()
data = loop.run_until_complete(client.server_info())
db = client.notes_db
notes = db.notes
tags = db.tags
如果没有get_event_loop()
和随后的loop.run_until_complete()
调用,它将不会测试数据库连接,直到您真正尝试访问/写入它
我的目标是能够停止启动过程,直到它能够成功连接到数据库,有什么干净的方法可以用Python和motor.io做到这一点吗(https://motor.readthedocs.io/,抱歉没有标签(?
我想FastAPI中的startup
事件就是这里的交易。我补充说,这个存储库是一个很好的例子,这个线程甚至可以为您提供更多信息。您可以在startup
事件中执行测试。这意味着在成功执行startup
事件之前,应用程序不会启动。