在FastAPI中进行身份验证后,无法识别令牌



我正在使用FastAPI创建一个基本的身份验证注册/登录。但是,在用户成功注册并登录后,令牌将不会被识别。使用"/docs";通过Swagger UI,但不是从主应用程序。

这是我的代码:main.py

import uvicorn
from fastapi import Depends, HTTPException
from auth import AuthHandler
from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
auth_handler = AuthHandler()
users = []

@app.get('/', response_class=HTMLResponse)
def get_register_form(request: Request):
return templates.TemplateResponse("register.html", {"request": request})

@app.post('/', response_class=HTMLResponse)
def register(request: Request, username: str = Form(...), password: str = Form(...)):
if len(users) != 0:
for x in users:
if x['username'] == username:
print('Username is taken!')
raise HTTPException(status_code=400, detail='Username is taken!')
hashed_password = auth_handler.get_password_hash(password)
users.append({
'username': username,
'password': hashed_password
})
print('User:', username, 'registered!')
return templates.TemplateResponse("success.html", {"request": request})

@app.get('/login', response_class=HTMLResponse)
def get_login_form(request: Request):
return templates.TemplateResponse("login.html", {"request": request})

@app.post('/login')
def login(request: Request, username: str = Form(...), password: str = Form(...)):
user = None
for x in users:
if x['username'] == username:
user = x
break
if (user is None) or (not auth_handler.verify_password(password, user['password'])):
print('Invalid username and/or password!')
raise HTTPException(status_code=401, detail='Invalid username and/or password!')
token = auth_handler.encode_token(user['username'])
return {'token': token}

@app.get('/protected')
def protected(username=Depends(auth_handler.auth_wrapper)):
return {'name': username}

if __name__ == '__main__':
uvicorn.run(app)

这是我的代码:auth.py

import jwt
from fastapi import HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from passlib.context import CryptContext
from datetime import datetime, timedelta

class AuthHandler():
security = HTTPBearer()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
secret = 'SECRET'
def get_password_hash(self, password):
return self.pwd_context.hash(password)
def verify_password(self, plain_password, hashed_password):
return self.pwd_context.verify(plain_password, hashed_password)
def encode_token(self, user_id):
payload = {
'exp': datetime.utcnow() + timedelta(days=0, minutes=5),
'iat': datetime.utcnow(),
'sub': user_id
}
return jwt.encode(
payload,
self.secret,
algorithm='HS256'
)
def decode_token(self, token):
try:
payload = jwt.decode(token, self.secret, algorithms=['HS256'])
return payload['sub']
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail='Signature has expired')
except jwt.InvalidTokenError as e:
raise HTTPException(status_code=401, detail='Invalid token')
def auth_wrapper(self, auth: HTTPAuthorizationCredentials = Security(security)):
return self.decode_token(auth.credentials)

这是我的表单。html:register.html和login.html是相同的。

<!doctype html>
<html lang="en">
<head>
<link rel="stylesheet" href="../static/styles.css">
<title>Document</title>
</head>
<body>
<div id="form">
<form method="post">
<h3>Login</h3>
<label for="username">Username:</label><br>
<input type="text" name="username" id="username"><br>
<label for="password">Password:</label><br>
<input type="text" name="password" id="password"><br><br>
<input type="submit" value="Submit" id="sub">
</form>
</div>
</body>
</html>

当转到127.0.0.1/受保护时,我得到的错误是:

{"detail":"Not authenticated"}

我该如何解决这个问题,让它像在文档中一样识别用户的令牌?

我在@MatsLindh的建议中找到了一个解决方案,我还简化了很多代码、导入等。很明显,您仍然需要具有所需表单和字段的html文件。就我而言,我只是添加了电子邮件和密码。

请注意,这不是";最佳实践";尤其是在数据库的同一个表中插入密码,即使它是散列的。

import uvicorn
import sqlite3
import jwt
from datetime import datetime, timedelta
from fastapi import FastAPI, Form, HTTPException, Cookie, Request, Depends
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from passlib.context import CryptContext

connection = sqlite3.connect("users.db", check_same_thread=False)
cursor = connection.cursor()
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")

class AuthHandler:
secret = 'SECRET'  # I can put any key I want, it's going to be used to encrypt and decrypt
bcrypt_obj = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(self, password):
return self.bcrypt_obj.hash(password)
def verify_password(self, plain_password, hashed_password):
return self.bcrypt_obj.verify(plain_password, hashed_password)
def encode_token(self, user_id):
payload = {
'exp': datetime.utcnow() + timedelta(days=0, minutes=5),
'iat': datetime.utcnow(),
'sub': user_id
}
return jwt.encode(
payload,
self.secret,
algorithm='HS256'
)
def decode_token(self, token):
try:
payload = jwt.decode(token, self.secret, algorithms=['HS256'])
return payload['sub']
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail='Signature has expired')
except jwt.InvalidTokenError as e:
raise HTTPException(status_code=401, detail='Invalid token')
def grant_access(self, token, users_db):
for x in users_db:
if x['email'] == self.decode_token(token):
return True
else:
return False

auth_handler = AuthHandler()

@app.get('/', response_class=HTMLResponse)
def get_register_form(request: Request):
return templates.TemplateResponse("register.html", {"request": request})

@app.post('/', response_class=HTMLResponse)
def register(email: str = Form(...), password: str = Form(...)):
cursor.execute("select * from users where email=:e", {"e": email})
user_with_same_email_list = cursor.fetchall()
if len(user_with_same_email_list) != 0:
print(user_with_same_email_list)
print('Email already registered!')
raise HTTPException(status_code=400, detail='Email already registered!')
else:
hashed_password = auth_handler.get_password_hash(password)
sqlite_insert_query = "INSERT INTO users (user_id, username, email, hashed_password, eoa) VALUES " 
"(1,'','" + email + "','" + hashed_password + "','')"
cursor.execute(sqlite_insert_query)
connection.commit()
print('User with email:', email, 'registered!')
response = RedirectResponse(url="/login")
response.status_code = 302
return response

@app.get('/login', response_class=HTMLResponse)
def get_login_form(request: Request):
return templates.TemplateResponse("login.html", {"request": request})

@app.post('/login')
def login(email: str = Form(...), password: str = Form(...)):
cursor.execute("select * from users where email=:e", {"e": email})
user_with_same_email_list = cursor.fetchall()
if len(user_with_same_email_list) == 0:
print('No user with this email!')
raise HTTPException(status_code=400, detail='No user with this email!')
elif (email is None) or (not auth_handler.verify_password(password, user_with_same_email_list[0][3])):
print('Invalid email and/or password!')
raise HTTPException(status_code=401, detail='Invalid email and/or password!')
else:
token = auth_handler.encode_token(email)
response = RedirectResponse(url="/check_cookie")
response.status_code = 302
response.set_cookie(key="Authorization", value=token, secure=True, httponly=True)
return response

@app.get("/check_cookie")
async def check_cookie(Authorization: str | None = Cookie(None)):
if Authorization:
email = auth_handler.decode_token(Authorization)
cursor.execute("select * from users where email=:e", {"e": email})
user_with_same_email_list = cursor.fetchall()
if len(user_with_same_email_list) == 0:
print('Invalid token')
raise HTTPException(status_code=401, detail='Invalid token')
else:
print('Access granted!')
response = RedirectResponse(url="/protected")
response.status_code = 302
return response
else:
print("No token found")
raise HTTPException(status_code=401, detail='No token found')

@app.get('/protected', response_class=HTMLResponse)
async def protected(request: Request, email=Depends(check_cookie)):
return templates.TemplateResponse("logged_in.html", {"request": request})

@app.get("/logged_out", response_class=HTMLResponse)
async def logged_out(request: Request):
return templates.TemplateResponse("logged_out.html", {"request": request})

@app.get("/logout")
async def route_logout_and_remove_cookie():
response = RedirectResponse(url="/logged_out")
response.delete_cookie("Authorization", domain="127.0.0.1")
return response

if __name__ == '__main__':
uvicorn.run(app)

最新更新