# FastAPI OAuth2 认证

### FastAPI OAuth2 认证

**需求：**

* 1. Get 方法接口任意都可以访问
* 2. Post 方法接口需要认证后才可以访问。

```python
# coding: utf8

from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='12300')



@app.get('/users')
async def get_users():
    return {"method": 'Get'}

@app.post('/users')
async def write_users(token: str = Depends(oauth2_scheme)):   # 传入任意 token 值都可以获取， 请往下看
    return {"method": 'Post', 'token': token}
# # 启动: uvicorn main:app --reload
```

### 获取当前用户

```python
# coding: utf8

from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from typing import Optional


app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='12300')


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


def fake_decode_token(token):
    # 解密并查询或者插入用户，如果存在并返回
    return User(
        username=token, email="token@token.com", full_name="token"
    )

def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    return user

@app.get('/users')
async def get_users():
    return {"method": 'Get'}

@app.post('/users')
async def write_users(current_user: User  = Depends(get_current_user)):   # 传入任意 token 值都可以获取， 请往下看
    return {"method": 'Post', 'token': current_user}

# # 启动: uvicorn main:app --reload
```

### 用密码和 Bearer 的简单 OAuth2

```python
# coding: utf8

from fastapi import FastAPI,Form, Depends, HTTPException,status
from fastapi.security import OAuth2PasswordRequestForm,OAuth2PasswordBearer
from typing import Optional
from pydantic import BaseModel


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()


# ---------------------------------------
class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    '''加密密码单独分离'''
    hashed_password: str

def hash_password(password: str):
    '''加密密码'''
    return password


def make_token(username: str):
    '''生成 Token 返回'''
    return username + "-token"
# ---------------------------------------

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def fake_decode_token(token):
    '''解密 Token'''
    user = get_user(fake_users, token)
    return user


def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user

# ---------------------------------------

# 模拟数据
fake_users = {
    "admin": {
        "username": "admin",
        "full_name": "adminshuoceshikaifa",
        "email": "admin@admin.com",
        "hashed_password": "admin",
        "disabled": False
    }
}

@app.post("/login")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    # 先从数据库里查询是否存在这个用户, 模拟一下数据
    user_dict = fake_users.get(form_data.username)

    # 如果不存在这个用户
    if not user_dict:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='User not found')

    # 如果用户存在，判断密码和数据库 HASH 密码是否一致
    user = UserInDB(**user_dict)
    hashed_password = hash_password(form_data.password)
    if  hashed_password != user.hashed_password:
        raise HTTPException(status_code=400, detail="Password error ")

    # 生成 Token
    token = make_token(user.username)
    return {"access_token": token, "username": user.username }



@app.get("/users/info")
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user
```

**代码中的：**

UserInDB(\*\*user\_dict) 表示：

直接将 user\_dict 的键和值作为关键字参数传递，等同于：

```python
UserInDB(
    username = user_dict["username"],
    email = user_dict["email"],
    full_name = user_dict["full_name"],
    disabled = user_dict["disabled"],
    hashed_password = user_dict["hashed_password"],
)
```

加入我们的状态现在改成了True

```python
fake_users = {
    "admin": {
        "username": "admin",
        "full_name": "adminshuoceshikaifa",
        "email": "admin@admin.com",
        "hashed_password": "admin",
        "disabled": True
    }
}



# 我们不想让disabled为True的时候不能获取。我们看下如何实现的

def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="已经删除")
    return current_user

@app.get("/users/me")
def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://close.gitbook.io/yun-wei-bi-ji/python/fastapi/fastapi-oauth2-ren-zheng.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
