FastAPI OAuth2 认证

FastAPI OAuth2 认证

需求:

    1. Get 方法接口任意都可以访问

    1. Post 方法接口需要认证后才可以访问。

# coding: utf8

from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='12300')



@app.get('/users')
async def get_users():
    return {"method": 'Get'}

@app.post('/users')
async def write_users(token: str = Depends(oauth2_scheme)):   # 传入任意 token 值都可以获取, 请往下看
    return {"method": 'Post', 'token': token}
# # 启动: uvicorn main:app --reload

获取当前用户

# coding: utf8

from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from typing import Optional


app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='12300')


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


def fake_decode_token(token):
    # 解密并查询或者插入用户,如果存在并返回
    return User(
        username=token, email="token@token.com", full_name="token"
    )

def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    return user

@app.get('/users')
async def get_users():
    return {"method": 'Get'}

@app.post('/users')
async def write_users(current_user: User  = Depends(get_current_user)):   # 传入任意 token 值都可以获取, 请往下看
    return {"method": 'Post', 'token': current_user}

# # 启动: uvicorn main:app --reload

用密码和 Bearer 的简单 OAuth2

# coding: utf8

from fastapi import FastAPI,Form, Depends, HTTPException,status
from fastapi.security import OAuth2PasswordRequestForm,OAuth2PasswordBearer
from typing import Optional
from pydantic import BaseModel


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()


# ---------------------------------------
class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    '''加密密码单独分离'''
    hashed_password: str

def hash_password(password: str):
    '''加密密码'''
    return password


def make_token(username: str):
    '''生成 Token 返回'''
    return username + "-token"
# ---------------------------------------

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def fake_decode_token(token):
    '''解密 Token'''
    user = get_user(fake_users, token)
    return user


def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user

# ---------------------------------------

# 模拟数据
fake_users = {
    "admin": {
        "username": "admin",
        "full_name": "adminshuoceshikaifa",
        "email": "admin@admin.com",
        "hashed_password": "admin",
        "disabled": False
    }
}

@app.post("/login")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    # 先从数据库里查询是否存在这个用户, 模拟一下数据
    user_dict = fake_users.get(form_data.username)

    # 如果不存在这个用户
    if not user_dict:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='User not found')

    # 如果用户存在,判断密码和数据库 HASH 密码是否一致
    user = UserInDB(**user_dict)
    hashed_password = hash_password(form_data.password)
    if  hashed_password != user.hashed_password:
        raise HTTPException(status_code=400, detail="Password error ")

    # 生成 Token
    token = make_token(user.username)
    return {"access_token": token, "username": user.username }



@app.get("/users/info")
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

代码中的:

UserInDB(**user_dict) 表示:

直接将 user_dict 的键和值作为关键字参数传递,等同于:

UserInDB(
    username = user_dict["username"],
    email = user_dict["email"],
    full_name = user_dict["full_name"],
    disabled = user_dict["disabled"],
    hashed_password = user_dict["hashed_password"],
)

加入我们的状态现在改成了True

fake_users = {
    "admin": {
        "username": "admin",
        "full_name": "adminshuoceshikaifa",
        "email": "admin@admin.com",
        "hashed_password": "admin",
        "disabled": True
    }
}



# 我们不想让disabled为True的时候不能获取。我们看下如何实现的

def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="已经删除")
    return current_user

@app.get("/users/me")
def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

Last updated