15、Celery异步任务集成

环境:

django-redis = "*"
django-celery-results = "==1.1.2"
django-celery-beat = "==1.5.0"
celery = "==4.3.0"

配置 setting_base.py

我这里开发环境和生成环境区分, 配置文件修改为: setting_base.py, 你的环境如果是默认环境,就修改 settings.py

# 添加  APP
INSTALLED_APPS = [
    # ...
    'app',                    # 测试的 app 名称
    'django_celery_results',  # 查看 celery 执行结果
    'django-celery-beat',     # 后台管理任务
]


# django 缓存
CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://:123456@10.10.181.18:6379/1",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
        }
    }
}


#############################
# celery 配置信息 start
#############################
# celery 定时任务
# 注意,celery4 版本后,CELERY_BROKER_URL 改为 BROKER_URL
CELERY_BROKER_URL = 'redis://:123456@10.10.181.18:6379/0'
# CELERY_RESULT_BACKEND = 'redis://:123456@10.10.181.18:6379/1'    # celery 结果返回,可用于跟踪结果
CELERY_RESULT_BACKEND = 'django-db'    # 使用 database 作为结果存储
CELERY_CACHE_BACKEND = 'django-cache'  # celery 后端缓存
CELERYD_MAX_TASKS_PER_CHILD = 3        # 每个 worker 最多执行3个任务就会被销毁,可防止内存泄露
# CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
# CELERY_TASK_ALWAYS_EAGER = True

# celery 内容等消息的格式设置
if os.name != "nt":
    # Mac and Centos
    # worker 启动命令:celery -A joyoo worker -l info
    CELERY_ACCEPT_CONTENT = ['application/json', ]
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
else:
    # windows
    # pip install eventlet
    # worker 启动命令:celery -A joyoo worker -l info -P eventlet  --pool=solo
    CELERY_ACCEPT_CONTENT = ['pickle', ]
    CELERY_TASK_SERIALIZER = 'pickle'
    CELERY_RESULT_SERIALIZER = 'pickle'

#############################
# celery 配置信息 end
#############################

项目根目录添加 celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

# 获取当前文件夹名,即为该 Django 的项目名
project_name = os.path.split(os.path.abspath('.'))[-1]
# project_settings = '%s.settings' % project_name


# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.setting_base')

# 实例化
app = Celery()

# namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置
# 但所有Celery配置项必须以CELERY开头,防止冲突
app.config_from_object('django.conf:settings', namespace='CELERY')

# Celery 加载所有注册的应用
# app.autodiscover_tasks()
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

项目根目录 __init__.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

# 获取当前文件夹名,即为该 Django 的项目名
project_name = os.path.split(os.path.abspath('.'))[-1]
# project_settings = '%s.settings' % project_name


# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.setting_base')

# 实例化
app = Celery()

# namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置
# 但所有Celery配置项必须以CELERY开头,防止冲突
app.config_from_object('django.conf:settings', namespace='CELERY')

# Celery 加载所有注册的应用
# app.autodiscover_tasks()
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

在提前创建好的 App 下测试

app/task.py

from __future__ import absolute_import, unicode_literals
from celery import task


@task
def demo_task():
    print('test')

app/views.py

from django.http import JsonResponse
from . import tasks


def demo_task(request):
    res = tasks.demo_task.delay()
    return JsonResponse({'status': 'successful', 'task_id': res.task_id})

app/urls.py

from django.urls import path
from . import views

urlpatterns = [
    path('demo_task/', views.demo_task),
]

project/urls.py,根项目的urls.py

urlpatterns = [
    ''''''
    # 测试 celery
    path('domain/', include('app.urls')),
]

生成数据库表

python manage.py makemigrations
python manage.py migration

启动 celery worker

# 项目根目录终端执行(project 项目名称)
# celery -A project worker -l info
celery -A project worker -l info --pool=solo

# 守护进程
/root/.virtualenvs/blog/bin/celery multi start w1 -A project -l info --logfile=./celerylog.log

启动 celery beat

# 项目根目录终端执行(project 项目名称)

celery -A project beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

启动项目

python manage.py runserver 0.0.0.0:8000

参考: http://xieboke.net/article/203/#_label0

参考: http://xieboke.net/article/200/#_label11

二次开发

  django-celery-beat插件本质上是对数据库表变化检查,一旦有数据库表改变,调度器重新读取任务进行调度,所以如果想自己定制的任务页面,只需要操作beat插件的四张表就可以了。当然你还可以自己定义调度器,django-celery-beat插件已经内置了model,只需要进行导入便可进行orm操作,以下我用django reset api进行示例:

settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app01.apps.App01Config',
    'django_celery_results',
    'django_celery_beat',
    'rest_framework',
]

urls.py

urlpatterns = [
    re_path(r'^admin/', admin.site.urls),
    re_path(r'^index$', views.index),
    re_path(r'^res$', views.get_res),
    re_path(r'^tasks$', views.TaskView.as_view({'get':'list'})),
]

views.py

from django_celery_beat.models import PeriodicTask  #倒入插件model
from rest_framework import serializers
from rest_framework import pagination
from rest_framework.viewsets import ModelViewSet
class Userserializer(serializers.ModelSerializer):
    class Meta:
        model = PeriodicTask
        fields = '__all__'

class Mypagination(pagination.PageNumberPagination):
    """自定义分页"""
    page_size=2
    page_query_param = 'p'
    page_size_query_param='size'
    max_page_size=4

class TaskView(ModelViewSet):
    queryset = PeriodicTask.objects.all()
    serializer_class = Userserializer
    permission_classes = []
    pagination_class = Mypagination

访问http://127.0.0.1:8000/tasks如下:

Last updated