# coding: utf8
from fastapi import FastAPI,Form, Depends, HTTPException,status
from fastapi.security import OAuth2PasswordRequestForm,OAuth2PasswordBearer
from typing import Optional
from pydantic import BaseModel
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
# ---------------------------------------
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
'''加密密码单独分离'''
hashed_password: str
def hash_password(password: str):
'''加密密码'''
return password
def make_token(username: str):
'''生成 Token 返回'''
return username + "-token"
# ---------------------------------------
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
'''解密 Token'''
user = get_user(fake_users, token)
return user
def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication",
headers={"WWW-Authenticate": "Bearer"},
)
return user
# ---------------------------------------
# 模拟数据
fake_users = {
"admin": {
"username": "admin",
"full_name": "adminshuoceshikaifa",
"email": "admin@admin.com",
"hashed_password": "admin",
"disabled": False
}
}
@app.post("/login")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# 先从数据库里查询是否存在这个用户, 模拟一下数据
user_dict = fake_users.get(form_data.username)
# 如果不存在这个用户
if not user_dict:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='User not found')
# 如果用户存在,判断密码和数据库 HASH 密码是否一致
user = UserInDB(**user_dict)
hashed_password = hash_password(form_data.password)
if hashed_password != user.hashed_password:
raise HTTPException(status_code=400, detail="Password error ")
# 生成 Token
token = make_token(user.username)
return {"access_token": token, "username": user.username }
@app.get("/users/info")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user