15、Celery异步任务集成
环境:
django-redis = "*"
django-celery-results = "==1.1.2"
django-celery-beat = "==1.5.0"
celery = "==4.3.0"
配置 setting_base.py
我这里开发环境和生成环境区分, 配置文件修改为: setting_base.py, 你的环境如果是默认环境,就修改 settings.py
# 添加 APP
INSTALLED_APPS = [
# ...
'app', # 测试的 app 名称
'django_celery_results', # 查看 celery 执行结果
'django-celery-beat', # 后台管理任务
]
# django 缓存
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://:123456@10.10.181.18:6379/1",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
}
}
}
#############################
# celery 配置信息 start
#############################
# celery 定时任务
# 注意,celery4 版本后,CELERY_BROKER_URL 改为 BROKER_URL
CELERY_BROKER_URL = 'redis://:123456@10.10.181.18:6379/0'
# CELERY_RESULT_BACKEND = 'redis://:123456@10.10.181.18:6379/1' # celery 结果返回,可用于跟踪结果
CELERY_RESULT_BACKEND = 'django-db' # 使用 database 作为结果存储
CELERY_CACHE_BACKEND = 'django-cache' # celery 后端缓存
CELERYD_MAX_TASKS_PER_CHILD = 3 # 每个 worker 最多执行3个任务就会被销毁,可防止内存泄露
# CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
# CELERY_TASK_ALWAYS_EAGER = True
# celery 内容等消息的格式设置
if os.name != "nt":
# Mac and Centos
# worker 启动命令:celery -A joyoo worker -l info
CELERY_ACCEPT_CONTENT = ['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
else:
# windows
# pip install eventlet
# worker 启动命令:celery -A joyoo worker -l info -P eventlet --pool=solo
CELERY_ACCEPT_CONTENT = ['pickle', ]
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
#############################
# celery 配置信息 end
#############################
项目根目录添加 celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
# 获取当前文件夹名,即为该 Django 的项目名
project_name = os.path.split(os.path.abspath('.'))[-1]
# project_settings = '%s.settings' % project_name
# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.setting_base')
# 实例化
app = Celery()
# namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置
# 但所有Celery配置项必须以CELERY开头,防止冲突
app.config_from_object('django.conf:settings', namespace='CELERY')
# Celery 加载所有注册的应用
# app.autodiscover_tasks()
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
项目根目录 __init__.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
# 获取当前文件夹名,即为该 Django 的项目名
project_name = os.path.split(os.path.abspath('.'))[-1]
# project_settings = '%s.settings' % project_name
# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.setting_base')
# 实例化
app = Celery()
# namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置
# 但所有Celery配置项必须以CELERY开头,防止冲突
app.config_from_object('django.conf:settings', namespace='CELERY')
# Celery 加载所有注册的应用
# app.autodiscover_tasks()
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
在提前创建好的 App 下测试
app/task.py
from __future__ import absolute_import, unicode_literals
from celery import task
@task
def demo_task():
print('test')
app/views.py
from django.http import JsonResponse
from . import tasks
def demo_task(request):
res = tasks.demo_task.delay()
return JsonResponse({'status': 'successful', 'task_id': res.task_id})
app/urls.py
from django.urls import path
from . import views
urlpatterns = [
path('demo_task/', views.demo_task),
]
project/urls.py
,根项目的urls.py
urlpatterns = [
''''''
# 测试 celery
path('domain/', include('app.urls')),
]
生成数据库表
python manage.py makemigrations
python manage.py migration
启动 celery worker
# 项目根目录终端执行(project 项目名称)
# celery -A project worker -l info
celery -A project worker -l info --pool=solo
# 守护进程
/root/.virtualenvs/blog/bin/celery multi start w1 -A project -l info --logfile=./celerylog.log
启动 celery beat
# 项目根目录终端执行(project 项目名称)
celery -A project beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
启动项目
python manage.py runserver 0.0.0.0:8000
参考: http://xieboke.net/article/203/#_label0
参考: http://xieboke.net/article/200/#_label11
二次开发
django-celery-beat插件本质上是对数据库表变化检查,一旦有数据库表改变,调度器重新读取任务进行调度,所以如果想自己定制的任务页面,只需要操作beat插件的四张表就可以了。当然你还可以自己定义调度器,django-celery-beat插件已经内置了model,只需要进行导入便可进行orm操作,以下我用django reset api进行示例:
settings.py
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'app01.apps.App01Config',
'django_celery_results',
'django_celery_beat',
'rest_framework',
]
urls.py
urlpatterns = [
re_path(r'^admin/', admin.site.urls),
re_path(r'^index$', views.index),
re_path(r'^res$', views.get_res),
re_path(r'^tasks$', views.TaskView.as_view({'get':'list'})),
]
views.py
from django_celery_beat.models import PeriodicTask #倒入插件model
from rest_framework import serializers
from rest_framework import pagination
from rest_framework.viewsets import ModelViewSet
class Userserializer(serializers.ModelSerializer):
class Meta:
model = PeriodicTask
fields = '__all__'
class Mypagination(pagination.PageNumberPagination):
"""自定义分页"""
page_size=2
page_query_param = 'p'
page_size_query_param='size'
max_page_size=4
class TaskView(ModelViewSet):
queryset = PeriodicTask.objects.all()
serializer_class = Userserializer
permission_classes = []
pagination_class = Mypagination
访问http://127.0.0.1:8000/tasks如下:
Last updated