FastAPI Jwt 认证
JWT
# 产生和校验JWT
pip install python-jose
# 处理哈希密码
pip install passlib bcrypt
1、创建函数,哈希来自用户的密码
2、创建函数,校验接收的密码是否与存储的哈希值匹配
3、创建函数,用于认证并返回用户
4、设定 JWT 令牌签名算法,值设置 HS256
5、创建设置令牌过期时间变量
6、定义令牌端点中用于响应的 Pydantic 模型
7、创建函数,生成新访问令牌的函数。
8、令牌解码,对其校验,返回当前用户
9、令牌无效,立即返回一个 HTTP 错误
10、使用令牌的过期时间创建一个 timedelta 对象。
11、创建一个真实的 JWT 访问令牌并返回它。
1.调用登录接口,产生token
2.调用依赖登录的接口,在请求头中携带token
3.依赖登录的接口,接受到请求,判断是否在headers中携带token
4.携带token,校验是否过期
5.解析token,获取username
6.根据username从redis获取,
7.查询到username的token且token相等,我们任务用户登录
8.调用接口返回数据
9.如果有其中一项校验不通过,返回对应的失败的信息
实战
# coding: utf8
from datetime import datetime, timedelta
from fastapi import FastAPI,Depends,HTTPException,status
from fastapi.security import OAuth2PasswordBearer,OAuth2PasswordRequestForm
from typing import Optional
from jose import JWTError,jwt
from pydantic import BaseModel
from passlib.context import CryptContext
users = {
"admin": {
"username": "admin",
"full_name": "adminshuoceshikaifa",
"email": "admin@admin.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False
}
}
app = FastAPI()
# 令牌加密私钥
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
# 令牌算法
ALGORITHM = "HS256"
# 令牌过期时间
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 认证方式
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
# 验证和解密密码
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
# 获取用户
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
# 验证用户
def auth_user(db, username: str, password: str):
user = get_user(db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# 生成 Token 返回
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encode_jwt
# 解密 Token, 返回用户
def get_current_user(token: str = Depends(oauth2_scheme)):
c_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Auth Error", headers={"WWW-Authenticate": "Bearer"})
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise c_exception
token_data = TokenData(username=username)
except JWTError:
raise c_exception
user = get_user(users, username=token_data.username)
if user is None:
raise c_exception
return user
# 判断用户是否已删除
def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Already delete")
return current_user
# 登录接口
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = auth_user(users, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
# 必须是如下格式
return {"access_token": access_token, "token_type": "bearer"}
# 个人接口
@app.get("/users/me",)
def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
Last updated