FastAPI Jwt 认证

JWT

# 产生和校验JWT
pip install python-jose 


# 处理哈希密码
pip install passlib bcrypt
  • 1、创建函数,哈希来自用户的密码

  • 2、创建函数,校验接收的密码是否与存储的哈希值匹配

  • 3、创建函数,用于认证并返回用户

  • 4、设定 JWT 令牌签名算法,值设置 HS256

  • 5、创建设置令牌过期时间变量

  • 6、定义令牌端点中用于响应的 Pydantic 模型

  • 7、创建函数,生成新访问令牌的函数。

  • 8、令牌解码,对其校验,返回当前用户

  • 9、令牌无效,立即返回一个 HTTP 错误

  • 10、使用令牌的过期时间创建一个 timedelta 对象。

  • 11、创建一个真实的 JWT 访问令牌并返回它。

  • 1.调用登录接口,产生token

  • 2.调用依赖登录的接口,在请求头中携带token

  • 3.依赖登录的接口,接受到请求,判断是否在headers中携带token

  • 4.携带token,校验是否过期

  • 5.解析token,获取username

  • 6.根据username从redis获取,

  • 7.查询到username的token且token相等,我们任务用户登录

  • 8.调用接口返回数据

  • 9.如果有其中一项校验不通过,返回对应的失败的信息

实战

# coding: utf8
from datetime import datetime, timedelta
from fastapi import FastAPI,Depends,HTTPException,status
from fastapi.security import OAuth2PasswordBearer,OAuth2PasswordRequestForm
from typing import Optional
from jose import JWTError,jwt
from pydantic import BaseModel
from passlib.context import CryptContext


users = {
    "admin": {
        "username": "admin",
        "full_name": "adminshuoceshikaifa",
        "email": "admin@admin.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False
    }
}

app = FastAPI()

# 令牌加密私钥
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"

# 令牌算法
ALGORITHM = "HS256"

# 令牌过期时间
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 认证方式
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Optional[str] = None


# 验证和解密密码
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

# 获取用户
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

# 验证用户
def auth_user(db, username: str, password: str):
    user = get_user(db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


# 生成 Token 返回
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encode_jwt


# 解密 Token, 返回用户
def get_current_user(token: str = Depends(oauth2_scheme)):
    c_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Auth Error", headers={"WWW-Authenticate": "Bearer"})
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise c_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise c_exception
    user = get_user(users, username=token_data.username)
    if user is None:
        raise c_exception
    return user

# 判断用户是否已删除
def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Already delete")
    return current_user



# 登录接口
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = auth_user(users, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    # 必须是如下格式
    return {"access_token": access_token, "token_type": "bearer"}


# 个人接口
@app.get("/users/me",)
def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

Last updated