FastAPI OAuth2 认证
FastAPI OAuth2 认证
需求:
Get 方法接口任意都可以访问
Post 方法接口需要认证后才可以访问。
# coding: utf8
from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='12300')
@app.get('/users')
async def get_users():
return {"method": 'Get'}
@app.post('/users')
async def write_users(token: str = Depends(oauth2_scheme)): # 传入任意 token 值都可以获取, 请往下看
return {"method": 'Post', 'token': token}
# # 启动: uvicorn main:app --reload
获取当前用户
# coding: utf8
from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='12300')
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
def fake_decode_token(token):
# 解密并查询或者插入用户,如果存在并返回
return User(
username=token, email="token@token.com", full_name="token"
)
def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
return user
@app.get('/users')
async def get_users():
return {"method": 'Get'}
@app.post('/users')
async def write_users(current_user: User = Depends(get_current_user)): # 传入任意 token 值都可以获取, 请往下看
return {"method": 'Post', 'token': current_user}
# # 启动: uvicorn main:app --reload
用密码和 Bearer 的简单 OAuth2
# coding: utf8
from fastapi import FastAPI,Form, Depends, HTTPException,status
from fastapi.security import OAuth2PasswordRequestForm,OAuth2PasswordBearer
from typing import Optional
from pydantic import BaseModel
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
# ---------------------------------------
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
'''加密密码单独分离'''
hashed_password: str
def hash_password(password: str):
'''加密密码'''
return password
def make_token(username: str):
'''生成 Token 返回'''
return username + "-token"
# ---------------------------------------
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
'''解密 Token'''
user = get_user(fake_users, token)
return user
def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication",
headers={"WWW-Authenticate": "Bearer"},
)
return user
# ---------------------------------------
# 模拟数据
fake_users = {
"admin": {
"username": "admin",
"full_name": "adminshuoceshikaifa",
"email": "admin@admin.com",
"hashed_password": "admin",
"disabled": False
}
}
@app.post("/login")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# 先从数据库里查询是否存在这个用户, 模拟一下数据
user_dict = fake_users.get(form_data.username)
# 如果不存在这个用户
if not user_dict:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='User not found')
# 如果用户存在,判断密码和数据库 HASH 密码是否一致
user = UserInDB(**user_dict)
hashed_password = hash_password(form_data.password)
if hashed_password != user.hashed_password:
raise HTTPException(status_code=400, detail="Password error ")
# 生成 Token
token = make_token(user.username)
return {"access_token": token, "username": user.username }
@app.get("/users/info")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
代码中的:
UserInDB(**user_dict) 表示:
直接将 user_dict 的键和值作为关键字参数传递,等同于:
UserInDB(
username = user_dict["username"],
email = user_dict["email"],
full_name = user_dict["full_name"],
disabled = user_dict["disabled"],
hashed_password = user_dict["hashed_password"],
)
加入我们的状态现在改成了True
fake_users = {
"admin": {
"username": "admin",
"full_name": "adminshuoceshikaifa",
"email": "admin@admin.com",
"hashed_password": "admin",
"disabled": True
}
}
# 我们不想让disabled为True的时候不能获取。我们看下如何实现的
def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="已经删除")
return current_user
@app.get("/users/me")
def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
Last updated