68、rest_framework认证、权限、频率组件

Django框架 / 2021-04-01

rest_framework为我们提供了登录认证、权限校验、访问频率校验三大组件,下面来看看它们的使用。

一、登录认证

1.1、认证源码分析

1、在APIView里面的dispatch方法中,有这么一条self.initial(request, *args, **kwargs),Ctrl+左键跳转到initial方法源码中,可以看到有以下三行即为认证、权限、频率三大组件:

# Ensure that the incoming request is permitted
self.perform_authentication(request)  # 认证
self.check_permissions(request)  # 权限
self.check_throttles(request)  # 频率

2、perform_authentication即登录认证,里面只有一行代码request.user,既然是request的user属性,我们就得去到request看有无此属性!

def perform_authentication(self, request):
    """
    Perform authentication on the incoming request.

    Note that if you override this and simply 'pass', then authentication
    will instead be performed lazily, the first time either
    `request.user` or `request.auth` is accessed.
    """
    request.user

3、request源码,在from rest_framework.request import Request的request类中,我们找到了user方法,用了property伪装成属性

    @property
    def user(self):
        """
        Returns the user associated with the current request, as authenticated
        by the authentication classes provided to the request.
        """
        if not hasattr(self, '_user'):  # 第一次访问没登录肯定没有
            with wrap_attributeerrors():
                self._authenticate()  # 
        return self._user

4、user方法又调用了_authenticate方法,它里面是一个for循环。

    def _authenticate(self):
        """
        Attempt to authenticate the request using each authentication instance
        in turn.
        """
        for authenticator in self.authenticators:  # 从认证类对象列表中循环取值
            try:
                user_auth_tuple = authenticator.authenticate(self)  # 返回元祖,里面是登录的用户与认证信息
            except exceptions.APIException:
                self._not_authenticated()  # 认证失败就报错
                raise

            if user_auth_tuple is not None:  # 如有值代表认证通过
                self._authenticator = authenticator
                self.user, self.auth = user_auth_tuple
                return

        self._not_authenticated()  # 为空则代表游客

5、而这个authenticators是在request的init初始化时传入的,那它什么时候触发的?是APIView的dispatch方法包装时触发

    def __init__(self, request, parsers=None, authenticators=None,
                 negotiator=None, parser_context=None):
        assert isinstance(request, HttpRequest), (
            'The `request` argument must be an instance of '
            '`django.http.HttpRequest`, not `{}.{}`.'
            .format(request.__class__.__module__, request.__class__.__name__)
        )

6、就是在dispatch这行代码里,里面调用了initialize_request方法

request = self.initialize_request(request, *args, **kwargs)

7、initialize_request里面定义了authenticators调用get_authenticators方法

    def initialize_request(self, request, *args, **kwargs):
        """
        Returns the initial request object.
        """
        parser_context = self.get_parser_context(request)

        return Request(
            request,
            parsers=self.get_parsers(),
            authenticators=self.get_authenticators(),  # 这行
            negotiator=self.get_content_negotiator(),
            parser_context=parser_context
        )

8、get_authenticators里面是个列表生成式,从authentication_classes里面循环取值,取一个出来就加括号执行

    def get_authenticators(self):
        """
        Instantiates and returns the list of authenticators that this view can use.
        """
        return [auth() for auth in self.authentication_classes]

9、而这个self.authentication_classes就是APIView类最开始时定义的

authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES

这个authentication_classes就是我们需要在自定义的视图类中定义的属性了,列表里套一个个自定义认证类产生的对象,所以我们需要写一个自定义的认证类!

1.2、自定义认证类

写一个类,继承BaseAuthentication,必须重写authenticate(为上面源码分析的第四步中user_auth_tuple = authenticator.authenticate(self)),认证的逻辑要写在里面;认证通过,要返回两个值(self.user, self.auth = user_auth_tuple),一个值最终给了request.user;不通过,报错AuthtenticationFaiiled.

1.2.1、创建超级用户
python manage.py createsuperuser
1.2.2、APP注册models表

如果想在Django自带的admin后台管理表,就需要在APP下的admin.py中注册对应的表。

from django.contrib import admin
from study import models

# Register your models here.


admin.site.register(models.Article)
admin.site.register(models.Site)
admin.site.register(models.User)
admin.site.register(models.Token)
1.2.3、models代码

手动给数据库加点数据进去好演示

class User(models.Model):
    username = models.CharField(max_length=32)
    password = models.CharField(max_length=32)
    user_type = models.IntegerField(choices=((1, 'superuser'), (2, 'normaluser')))

    class Meta(object):
        verbose_name_plural = '用户'


class Token(models.Model):
    user = models.ForeignKey(to=User, on_delete=models.CASCADE)
    token = models.CharField(max_length=64)


class Article(models.Model):
    name = models.CharField(max_length=16)
    create_time = models.DateField(auto_now_add=True)
    site = models.OneToOneField(to='Site', on_delete=models.CASCADE)

    class Meta(object):
        verbose_name_plural = '文章'

    def __str__(self):
        return self.name
    

class Site(models.Model):
    name = models.CharField(max_length=16)

    class Meta(object):
        verbose_name_plural = '站点'

    def __str__(self):
        return self.name
1.2.4、自定义认证类
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from study.models import Token


class LoginAuthentication(BaseAuthentication):
    def authenticate(self, request):
        token = request.GET.get('token')
        user_token = Token.objects.filter(token=token).first()

        if not user_token:
            raise AuthenticationFailed('Authentication failed')
        else:
            return user_token.user, token
1.2.5、views代码

postman提交GET请求时,必须先登录

from study.serializer import ArticleSeriallizer
from study import models
from rest_framework.generics import ListAPIView, CreateAPIView, RetrieveUpdateDestroyAPIView


# Create your views here.


class ArticlesView(ListAPIView):
    queryset = models.Article.objects.all()  # queryset对象,查询所有。
    serializer_class = ArticleSeriallizer  # 要使用的序列化类
    

class Login(APIView):
    def post(self, request):
        user = request.POST.get('username')
        password = request.POST.get('password')
        user_obj = models.User.objects.filter(username=user, password=password).first()

        if user_obj:
            token = uuid.uuid4()
            models.Token.objects.update_or_create(user=user_obj, token=token)
        return Response({'token': token})

1.2.6、urls代码

from django.contrib import admin
from django.urls import path,re_path
from study import views

urlpatterns = [
    path('admin/', admin.site.urls),
    re_path('article/(?P<pk>\d+)', views.ArticleView.as_view()),
    path('articles/', views.ArticlesView.as_view()),
    path('article/', views.ArticleView.as_view())
]
1.2.7、序列化器代码
class ArticleSeriallizer(serializers.ModelSerializer):
    class Meta:
        model = models.Article
        fields = '__all__'
1.2.8、验证效果
未登录时

浏览器输入http://127.0.0.1:8000/articles/,直接报错

{"detail":"Authentication failed"}
登录时

postman先登录,然后将登录后获取到的token放在url里

127.0.0.1:8000/articles/?token=8bfcc7ad-23b2-444b-a6f6-b2a91bb84629

得到结果

[
    {
        "id": 1,
        "name": "津门除妖记",
        "create_time": "2018-09-18",
        "site": 1
    },
    {
        "id": 2,
        "name": "今村异闻录",
        "create_time": "2018-05-15",
        "site": 2
    }
]
1.2.9、局部使用

以上为全局使用的演示,局部使用类似之前json那样配置,取消settings的配置然后在视图类里配置以下一行代码即可:

from d1.utils.study_auth import LoginAuthentication


class ArticlesView(ListAPIView):
    authentication_classes = [LoginAuthentication,]  # 把自定义认证类填进去即可,记得先导入

    queryset = models.Article.objects.all()  # queryset对象,查询所有。
    serializer_class = ArticleSeriallizer  # 要使用的序列化类

当我配置了全局认证后,如何在某个视图类内规避此认证?在局部写个空列表就行,就是上面的列表留空就行。

二、权限校验

校验用户是否拥有某些操作的权限

2.1、权限源码分析

又到了源码分析环节,( Ĭ ^ Ĭ ),别说了,开始吧!上面说过在APIView-->dispatch-->initial那定义了权限方法

self.check_permissions(request)

来都来了,点进去看一下吧,发现跟认证模块非常像,所以也是要在自定义类中写固定的方法:has_permission

def check_permissions(self, request):
    """
    Check if the request should be permitted.
    Raises an appropriate exception if the request is not permitted.
    """
    for permission in self.get_permissions():  # 跟认证一样,都是for循环一个个的权限类的对象
        # self是循环取出来的权限对象,request是视图类对象
        if not permission.has_permission(request, self):  # 需要重写has_permission方法
            self.permission_denied(  # 无权限返回false
                request,
                message=getattr(permission, 'message', None),
                code=getattr(permission, 'code', None)
            )

2.2、自定义权限类

权限类代码

比认证简单了好多啊!

class PermissionAuth(BasePermission):
    def has_permission(self, request, view):
        user = request.user  # 当前登录用户,因为已经通过了认证校验,所以request内就有user对象了。
        if user.user_type == 1:  # 如果是管理员,则可以访问,否则报错。
            return True
        else:
            return False
全局开启权限校验
REST_FRAMEWORK = {
    'DEFAULT_RENDERER_CLASSES': [
        'rest_framework.renderers.JSONRenderer',
        # 'rest_framework.renderers.BrowsableAPIRenderer',
    ],
    'DEFAULT_AUTHENTICATION_CLASSES': [  # 全局认证配置
        'd1.utils.study_auth.LoginAuthentication',
    ],
    'DEFAULT_PERMISSION_CLASSES': [  # 全局权限校验配置
        'd1.utils.study_auth.PermissionAuth',
    ],

}
测试结果

我用了一个普通用户登陆后,发起请求返回如下:

{
    "detail": "You do not have permission to perform this action."
}

局部配置

局部禁用的话跟认证一样只要列表留空即可。

class ArticlesView(ListAPIView):
    authentication_classes = [LoginAuthentication,]
    permission_classes = [PermissionAuth]  # 跟认证是一样的配置

    queryset = models.Article.objects.all()
    serializer_class = ArticleSeriallizer

2.3、内置权限IsAdminUser

大家可以点击rest_framework.permissions进去看源码,有很多写好的内置方法可以提供给我们用,像我们上面写的权限校验内置的也有类似的功能,它用的是auth模块的一套,来介绍其中一个吧:IsAdminUser。

顾名思义是判断当前用户是否为管理员,配置也非常简单,只需要在全局或局部配置以下一行代码即可:

permission_classes = [IsAdminUser]  # 判断标准是is_staff字段

三、频率/限流校验

可以跟上面认证、权限一样自定义类,也可以使用rest_framework已经定义好的类,先来学习内置的类的使用方法

3.1、内置频率限制

用的是auth模块的认证套路

3.1.1、全局配置
'DEFAULT_THROTTLE_CLASSES': [  # 更详细配置请看rest_framework的settings源码
    'rest_framework.throttling.AnonRateThrottle',  # 匿名用户
    'rest_framework.throttling.UserRateThrottle'  # 登录用户
],
'DEFAULT_THROTTLE_RATES': {
    'user': '5/m',  # 登录用户访问次数限制
    'anon': '3/ms',  # 匿名用户访问频率限制,m为分钟,h为小时,s为秒,d为天
},

为演示效果,我们先禁用此前配置的认证和权限校验

from rest_framework.throttling import AnonRateThrottle, UserRateThrottle  # 先导入对应模块


class ArticlesView(ListAPIView):
    authentication_classes = []
    permission_classes = []

    queryset = models.Article.objects.all()
    serializer_class = ArticleSeriallizer
3.1.2、验证效果
匿名用户验证

postman验证127.0.0.1:8000/articles/,超过三次后报错如下:

{
    "detail": "Request was throttled. Expected available in 59 seconds."
}
登录用户验证

使用登录用户去访问,超过五次后报错跟上面的一样

3.1.3、局部使用

THROTTLE_RATES频率限制只能在settings里面配置

class ArticlesView(ListAPIView):
    authentication_classes = []
    permission_classes = []
    throttle_classes = [AnonRateThrottle, UserRateThrottle]

    queryset = models.Article.objects.all()
    serializer_class = ArticleSeriallizer
世间微尘里 独爱茶酒中