> For the complete documentation index, see [llms.txt](https://close.gitbook.io/yun-wei-bi-ji/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://close.gitbook.io/yun-wei-bi-ji/python/flask/flask-celeryext.md).

# Flask-CeleryExt

* 参考:
  * 项目: <https://testdriven.io/courses/flask-celery/app-factory/#H-6-add-celery>
* 参考：
  * 官网: [Flask-CeleryExt — Flask-CeleryExt 0.4.0 documentation](https://flask-celeryext.readthedocs.io/en/latest/)

```python
pip3 install Flask-CeleryExt
```

```python
class BaseConfig:
    """Base configuration"""
    BASE_DIR = Path(__file__).parent.parent

    TESTING = False
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL', f'sqlite:///{BASE_DIR}/db.sqlite3')

    CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0")              # new
    CELERY_RESULT_BACKEND = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0")      # new
```

Create a new file called project/celery\_utils.py:

```python
from celery import current_app as current_celery_app


def make_celery(app):
    celery = current_celery_app
    celery.config_from_object(app.config, namespace="CELERY")

    return celery
```

Notes:

* celery.config\_from\_object(app.config, namespace="CELERY") 指示所有与Celery相关的配置键都应该以大写形式编写，并以CELERY\_为前缀。例如，要配置broker\_url，您应该使用CELERY\_BROKER\_URL。

Update project/**init**.py to instantiate the FlaskCeleryExt extension:

```python
import os

from flask import Flask
from flask_celeryext import FlaskCeleryExt  # new
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy

from project.celery_utils import make_celery  # new
from project.config import config



# instantiate the extensions
db = SQLAlchemy()
migrate = Migrate()
ext_celery = FlaskCeleryExt(create_celery_app=make_celery)  # new


def create_app(config_name=None):
    if config_name is None:
        config_name = os.environ.get("FLASK_CONFIG", "development")

    # instantiate the app
    app = Flask(__name__)

    # set config
    app.config.from_object(config[config_name])

    # set up extensions
    db.init_app(app)
    migrate.init_app(app, db)
    ext_celery.init_app(app)  # new

    # register blueprints
    from project.users import users_blueprint
    app.register_blueprint(users_blueprint)

    # shell context for flask cli
    @app.shell_context_processor
    def ctx():
        return {"app": app, "db": db}

    return app
```

Create a new file called project/users/tasks.py:

```python
from celery import shared_task


@shared_task
def divide(x, y):
    import time
    time.sleep(5)
    return x / y
```

Notes:

* web上的许多资源都推荐使用celery.task。在某些情况下，这可能会导致循环导入，因为您必须导入celery实例。我们使用shared\_task使我们的代码可重用，这同样需要make\_celery中的current\_app，而不是创建一个新的celery实例。现在，我们可以在应用程序中的任何地方复制这个文件，它会像预期的那样工作。

Update project/users/**init**.py:

```python
from flask import Blueprint

users_blueprint = Blueprint("users", __name__, url_prefix="/users", template_folder="templates")

from . import models, tasks  # noqa
```

This will ensure that users/tasks.py will be loaded when Flask registers the blueprint and the tasks will be found by the Celery worker.

Update app.py:

```python
from project import create_app, ext_celery

app = create_app()
celery = ext_celery.celery
```

Manual Test Run a worker in one terminal window:

```python
(env)$ celery -A app.celery worker --loglevel=info -P gevent

[config]
.> app:         default:0x110bdea60 (.default.Loader)
.> transport:   redis://127.0.0.1:6379/0
.> results:     redis://127.0.0.1:6379/0
.> concurrency: 16 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . project.users.tasks.divide
```

Enter the Flask shell in a new terminal:

```python
(env)$ FLASK_APP=app.py flask shell
Send some tasks to the Celery worker:

>>> from project.users.tasks import divide
>>> task = divide.delay(1, 2)
Back in the first terminal window, you should see the logs from the worker:

[2021-09-16 09:44:51,516: INFO/MainProcess] Task project.users.tasks.divide[566a69b5-ce41-496f-8939-a69cb2bdbf54] received
[2021-09-16 09:44:56,531: INFO/ForkPoolWorker-8] Task project.users.tasks.divide[566a69b5-ce41-496f-8939-a69cb2bdbf54] succeeded in 5.012977000907995s: 0.5
```


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://close.gitbook.io/yun-wei-bi-ji/python/flask/flask-celeryext.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
