# coding: utf8
from datetime import datetime, timedelta
from fastapi import FastAPI,Depends,HTTPException,status
from fastapi.security import OAuth2PasswordBearer,OAuth2PasswordRequestForm
from typing import Optional
from jose import JWTError,jwt
from pydantic import BaseModel
from passlib.context import CryptContext
users = {
"admin": {
"username": "admin",
"full_name": "adminshuoceshikaifa",
"email": "admin@admin.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False
}
}
app = FastAPI()
# 令牌加密私钥
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
# 令牌算法
ALGORITHM = "HS256"
# 令牌过期时间
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 认证方式
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
# 验证和解密密码
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
# 获取用户
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
# 验证用户
def auth_user(db, username: str, password: str):
user = get_user(db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# 生成 Token 返回
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encode_jwt
# 解密 Token, 返回用户
def get_current_user(token: str = Depends(oauth2_scheme)):
c_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Auth Error", headers={"WWW-Authenticate": "Bearer"})
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise c_exception
token_data = TokenData(username=username)
except JWTError:
raise c_exception
user = get_user(users, username=token_data.username)
if user is None:
raise c_exception
return user
# 判断用户是否已删除
def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Already delete")
return current_user
# 登录接口
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = auth_user(users, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
# 必须是如下格式
return {"access_token": access_token, "token_type": "bearer"}
# 个人接口
@app.get("/users/me",)
def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user