JWT多方式登录及自定义验证

RBAC(基于用户权限访问控制的认证)   (自己了解:基于auth的认证规则)    Django框架采用的是RBAC认证规则, RBAC认证规则通常会分为 三表规则、五表规则, Django采用的是六表规则    三表:用户表、角色表、权限表    五表:用户表、角色表、权限表、用户角色关系表、角色权限关系表    六表:用户表、角色表、权限表、用户角色关系表、角色权限关系表、用户权限关系表

JWT 多方面登录

用户可以以用户名、邮箱、手机号登录

  • 手机号+密码

  • 用户名+密码

  • 邮箱名+密码

流程分析(post请求):

  • 路由 -- 自动生成

  • 视图类: -- ViewSet(ViewSetMixin, views.APIView)

  • 序列化类: -- 重写validate方法,在这里面对用户名和密码进行校验

之前写的逻辑处理一般都是在视图类中,现在把逻辑拿到了序列化类的validate全局钩子中,在视图函数中 就无需写逻辑了,执行到.is_valid就会执行validate方法

  • 重点:视图类和序列化类之间交互数据的桥梁:context={}(是1个字典)

  • 自定义用户表,登录手动签发token,自定义的认证类

  • 认证通过,正常情况下查询得到当前登录用户

效果

用户名登录

pip install djangorestframework-jwt djangorestframework

代码

  • settings

INSTALLED_APPS = [
....
    'app01.apps.App01Config',
    'rest_framework'
]

AUTH_USER_MODEL = 'app01.UserInfo'  # 扩写auth的user表,必须配置

import datetime
REST_FRAMEWORK = {
    'EXCEPTION_HANDLER': 'app01.utils.common_exception', # 配置全局异常
    'JWT_EXPIRATION_DELTA': datetime.timedelta(weeks=1), # 过期时间1周
}
  • urls

from django.contrib import admin
from django.urls import path
from app01.views import LoginView
from app01 import views

urlpatterns = [
    path('admin/', admin.site.urls),
    path('login/',views.LoginView.as_view({'post': 'post'}))
]
  • views

from rest_framework.viewsets import ViewSet
from app01.serializer import LoginSerializer
from app01.utils import APIResponse


class LoginView(ViewSet):
    def post(self, request, *args, **kwargs):
        # 实例化得到一个序列化类的对象
        ser = LoginSerializer(data=request.data)
        # 序列化类的对象的校验方法
        ser.is_valid(raise_exception=True)
        token = ser.context.get('token')
        # 如果通过,表示登录成功,返回手动签发的token
        # 如果失败,抛异常,就不用管了
        username = ser.context.get('username')
        return APIResponse(token=token, username=username)
  • models

from django.db import models
from django.contrib.auth.models import AbstractUser

# 扩展 django 自带的 User, 为的添加一个字段
class UserInfo(AbstractUser):
    phone = models.CharField(max_length=32, unique=True)
  • utils

from rest_framework.response import Response
from rest_framework.views import exception_handler

# 自己封装的response
class APIResponse(Response):
    def __init__(self, code=200, msg='成功', data=None, status=None,
                 headers=None, content_type=None, **kwargs):
        dic = {'code': code, 'msg': msg}
        if data:
            dic['data'] = data
        dic.update(kwargs)
        super().__init__(data=dic, status=status,
                         headers=headers, content_type=content_type)


# 自己封装的全局异常处理
def common_exception(exc, context):
    # 先调用REST framework默认的异常处理方法获得标准错误响应对象
    response = exception_handler(exc, context)
    # 在此处补充自定义的异常处理
    if response is None:
        response = Response(data={'code': 999, 'msg': str(exc)})

    return response
  • serializer

from rest_framework import serializers

from app01.models import UserInfo
import re
from rest_framework.exceptions import ValidationError
from rest_framework_jwt.utils import jwt_encode_handler, jwt_payload_handler


class LoginSerializer(serializers.ModelSerializer):  # 只要跟表有关系,就继承modelSerializer
    username = serializers.CharField()    # 重写 username , 否则会它会认为你想存数据
    class Meta:
        model = UserInfo
        fields = ['username', 'password']

    def validate(self, attrs):
        # username phone email  都可能是登录账户
        username = attrs.get('username')
        password = attrs.get('password')
        if re.match('^1[0-9]\d{9}$', username):  # 手机号正则
            user = UserInfo.objects.filter(phone=username).first()
            
        elif re.match('^.+@.+$', username):  # 邮箱登录正则
            user = UserInfo.objects.filter(email=username).first()
            
        else:  # 用户名登录
            user = UserInfo.objects.filter(username=username).first()
            
        if user and user.check_password(password):  # 如果登录成功,生成token
            payload = jwt_payload_handler(user)  # 通过user拿到payload
            token = jwt_encode_handler(payload)  # 通过payload拿到token
            #  token是要在视图类种使用,现在我们在序列化类中
            # 视图类和序列化类之间通过context这个字典来传递数据
            self.context['token'] = token
            self.context['username'] = user.username
            return attrs

        else:
            raise ValidationError('账号或密码错误')

JWT 自定义验证

  • authentications.py (创建认证文件)

from rest_framework_jwt.utils import jwt_decode_handler
import jwt
from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
from rest_framework.exceptions import AuthenticationFailed


# 自定义的认证类(验证请求数据需要的认证)
class JwtAuthentication(BaseJSONWebTokenAuthentication):
    # 自定义认证类,重写authenticate方法
    def authenticate(self, request):
        '''
         认证通过,返回user,auth
         认证失败,返回None
        '''
        token = request.META.get('HTTP_Authorization'.upper())
        if not token:
            return None

        try:
            payload = jwt_decode_handler(token)
        except jwt.ExpiredSignature:
            raise AuthenticationFailed('过期了')
        except jwt.DecodeError:
            raise AuthenticationFailed('解码错误')
        except jwt.InvalidTokenError:
            raise AuthenticationFailed('不合法的token')
        user = self.authenticate_credentials(payload)
        return (user, token)
  • settings.py

# rest_framework 设置
REST_FRAMEWORK = {
    # '我们自定义认证函数的路径',  (app.py文件.类)
    'DEFAULT_AUTHENTICATION_CLASSES': [
        "test_jwt.authentications.JwtAuthentication",
    ],
    # 全局权限配置:一站式网站(所有操作都需要登录后才能访问)
    'DEFAULT_PERMISSION_CLASSES': [
        'rest_framework.permissions.IsAuthenticated',
    ],

    # 配置全局异常
    'EXCEPTION_HANDLER': 'test_jwt.utils.common_exception',

}

JWT_AUTH = {
    'JWT_EXPIRATION_DELTA': datetime.timedelta(seconds=30000),   # token有效期
    # 'JWT_AUTH_HEADER_PREFIX': 'TOKEN',  # token前缀,默认是 jwt
}

由于设置了所有操作都需要登录后才能访问,那登录接口无法访问得到token数据。解决如下:

from rest_framework.permissions import AllowAny

class LoginView(ViewSet):
    permission_classes = [AllowAny]

    def post(self, request, *args, **kwargs):
        '''略'''

携带 token 访问另一个接口:

Last updated