# 15、Celery异步任务集成

### 环境：

```python
django-redis = "*"
django-celery-results = "==1.1.2"
django-celery-beat = "==1.5.0"
celery = "==4.3.0"
```

### 配置 setting\_base.py

> 我这里开发环境和生成环境区分， 配置文件修改为： setting\_base.py， 你的环境如果是默认环境，就修改 settings.py

```python
# 添加  APP
INSTALLED_APPS = [
    # ...
    'app',                    # 测试的 app 名称
    'django_celery_results',  # 查看 celery 执行结果
    'django-celery-beat',     # 后台管理任务
]


# django 缓存
CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://:123456@10.10.181.18:6379/1",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
        }
    }
}


#############################
# celery 配置信息 start
#############################
# celery 定时任务
# 注意，celery4 版本后，CELERY_BROKER_URL 改为 BROKER_URL
CELERY_BROKER_URL = 'redis://:123456@10.10.181.18:6379/0'
# CELERY_RESULT_BACKEND = 'redis://:123456@10.10.181.18:6379/1'    # celery 结果返回，可用于跟踪结果
CELERY_RESULT_BACKEND = 'django-db'    # 使用 database 作为结果存储
CELERY_CACHE_BACKEND = 'django-cache'  # celery 后端缓存
CELERYD_MAX_TASKS_PER_CHILD = 3        # 每个 worker 最多执行3个任务就会被销毁，可防止内存泄露
# CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
# CELERY_TASK_ALWAYS_EAGER = True

# celery 内容等消息的格式设置
if os.name != "nt":
    # Mac and Centos
    # worker 启动命令：celery -A joyoo worker -l info
    CELERY_ACCEPT_CONTENT = ['application/json', ]
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
else:
    # windows
    # pip install eventlet
    # worker 启动命令：celery -A joyoo worker -l info -P eventlet  --pool=solo
    CELERY_ACCEPT_CONTENT = ['pickle', ]
    CELERY_TASK_SERIALIZER = 'pickle'
    CELERY_RESULT_SERIALIZER = 'pickle'

#############################
# celery 配置信息 end
#############################
```

### 项目根目录添加 celery.py

```python
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

# 获取当前文件夹名，即为该 Django 的项目名
project_name = os.path.split(os.path.abspath('.'))[-1]
# project_settings = '%s.settings' % project_name


# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.setting_base')

# 实例化
app = Celery()

# namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置
# 但所有Celery配置项必须以CELERY开头，防止冲突
app.config_from_object('django.conf:settings', namespace='CELERY')

# Celery 加载所有注册的应用
# app.autodiscover_tasks()
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
```

### 项目根目录 \_\_init\_\_.py

```python
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

# 获取当前文件夹名，即为该 Django 的项目名
project_name = os.path.split(os.path.abspath('.'))[-1]
# project_settings = '%s.settings' % project_name


# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.setting_base')

# 实例化
app = Celery()

# namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置
# 但所有Celery配置项必须以CELERY开头，防止冲突
app.config_from_object('django.conf:settings', namespace='CELERY')

# Celery 加载所有注册的应用
# app.autodiscover_tasks()
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
```

### 在提前创建好的 App 下测试

`app/task.py`

```python
from __future__ import absolute_import, unicode_literals
from celery import task


@task
def demo_task():
    print('test')
```

`app/views.py`

```python
from django.http import JsonResponse
from . import tasks


def demo_task(request):
    res = tasks.demo_task.delay()
    return JsonResponse({'status': 'successful', 'task_id': res.task_id})
```

`app/urls.py`

```python
from django.urls import path
from . import views

urlpatterns = [
    path('demo_task/', views.demo_task),
]
```

`project/urls.py`，根项目的urls.py

```python
urlpatterns = [
    ''''''
    # 测试 celery
    path('domain/', include('app.urls')),
]
```

### 生成数据库表

```python
python manage.py makemigrations
python manage.py migration
```

### 启动 celery worker

```python
# 项目根目录终端执行（project 项目名称）
# celery -A project worker -l info
celery -A project worker -l info --pool=solo

# 守护进程
/root/.virtualenvs/blog/bin/celery multi start w1 -A project -l info --logfile=./celerylog.log
```

### 启动 celery beat

```python
# 项目根目录终端执行（project 项目名称）

celery -A project beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
```

### 启动项目

```python
python manage.py runserver 0.0.0.0:8000
```

![](https://2134947750-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FvILwbD2PrkBCkSitM3m4%2Fuploads%2Fl0dv8yh9PudszJFhSkvG%2Fimage-20210828153605708.png?alt=media\&token=8d9a8c88-b713-4438-81e4-a05c268999a1)

![](https://2134947750-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FvILwbD2PrkBCkSitM3m4%2Fuploads%2FqQuEhkbZSVQIK73yukQf%2Fimage-20210828153744907.png?alt=media\&token=ba415d63-3c7e-4be8-948d-5efd714c4bf2)

![](https://2134947750-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FvILwbD2PrkBCkSitM3m4%2Fuploads%2FGFc55FOELyEPqUA1wexQ%2Fimage-20210828153815362.png?alt=media\&token=f54ca84a-e58b-4994-8b29-af2f5cf286fe)

参考： <http://xieboke.net/article/203/#\\_label0>

参考： <http://xieboke.net/article/200/#\\_label11>

#### 二次开发

　　django-celery-beat插件本质上是对数据库表变化检查，一旦有数据库表改变，调度器重新读取任务进行调度，所以如果想自己定制的任务页面，只需要操作beat插件的四张表就可以了。当然你还可以自己定义调度器，django-celery-beat插件已经内置了model，只需要进行导入便可进行orm操作，以下我用django reset api进行示例：

`settings.py`

```python
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app01.apps.App01Config',
    'django_celery_results',
    'django_celery_beat',
    'rest_framework',
]
```

`urls.py`

```python
urlpatterns = [
    re_path(r'^admin/', admin.site.urls),
    re_path(r'^index$', views.index),
    re_path(r'^res$', views.get_res),
    re_path(r'^tasks$', views.TaskView.as_view({'get':'list'})),
]
```

`views.py`

```python
from django_celery_beat.models import PeriodicTask  #倒入插件model
from rest_framework import serializers
from rest_framework import pagination
from rest_framework.viewsets import ModelViewSet
class Userserializer(serializers.ModelSerializer):
    class Meta:
        model = PeriodicTask
        fields = '__all__'

class Mypagination(pagination.PageNumberPagination):
    """自定义分页"""
    page_size=2
    page_query_param = 'p'
    page_size_query_param='size'
    max_page_size=4

class TaskView(ModelViewSet):
    queryset = PeriodicTask.objects.all()
    serializer_class = Userserializer
    permission_classes = []
    pagination_class = Mypagination
```

`访问http://127.0.0.1:8000/tasks如下：`

![](https://2134947750-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FvILwbD2PrkBCkSitM3m4%2Fuploads%2FYKTaAEMD6EejlCeqdgvi%2F1075473-20180824153332274-1150335891.png?alt=media\&token=021d5069-a631-430f-890e-4a0840940e62)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://close.gitbook.io/yun-wei-bi-ji/python/django/15celery-yi-bu-ren-wu-ji-cheng.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
