Flask-CeleryExt

pip3 install Flask-CeleryExt
class BaseConfig:
    """Base configuration"""
    BASE_DIR = Path(__file__).parent.parent

    TESTING = False
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL', f'sqlite:///{BASE_DIR}/db.sqlite3')

    CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0")              # new
    CELERY_RESULT_BACKEND = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0")      # new

Create a new file called project/celery_utils.py:

from celery import current_app as current_celery_app


def make_celery(app):
    celery = current_celery_app
    celery.config_from_object(app.config, namespace="CELERY")

    return celery

Notes:

  • celery.config_from_object(app.config, namespace="CELERY") 指示所有与Celery相关的配置键都应该以大写形式编写,并以CELERY_为前缀。例如,要配置broker_url,您应该使用CELERY_BROKER_URL。

Update project/init.py to instantiate the FlaskCeleryExt extension:

import os

from flask import Flask
from flask_celeryext import FlaskCeleryExt  # new
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy

from project.celery_utils import make_celery  # new
from project.config import config



# instantiate the extensions
db = SQLAlchemy()
migrate = Migrate()
ext_celery = FlaskCeleryExt(create_celery_app=make_celery)  # new


def create_app(config_name=None):
    if config_name is None:
        config_name = os.environ.get("FLASK_CONFIG", "development")

    # instantiate the app
    app = Flask(__name__)

    # set config
    app.config.from_object(config[config_name])

    # set up extensions
    db.init_app(app)
    migrate.init_app(app, db)
    ext_celery.init_app(app)  # new

    # register blueprints
    from project.users import users_blueprint
    app.register_blueprint(users_blueprint)

    # shell context for flask cli
    @app.shell_context_processor
    def ctx():
        return {"app": app, "db": db}

    return app

Create a new file called project/users/tasks.py:

from celery import shared_task


@shared_task
def divide(x, y):
    import time
    time.sleep(5)
    return x / y

Notes:

  • web上的许多资源都推荐使用celery.task。在某些情况下,这可能会导致循环导入,因为您必须导入celery实例。我们使用shared_task使我们的代码可重用,这同样需要make_celery中的current_app,而不是创建一个新的celery实例。现在,我们可以在应用程序中的任何地方复制这个文件,它会像预期的那样工作。

Update project/users/init.py:

from flask import Blueprint

users_blueprint = Blueprint("users", __name__, url_prefix="/users", template_folder="templates")

from . import models, tasks  # noqa

This will ensure that users/tasks.py will be loaded when Flask registers the blueprint and the tasks will be found by the Celery worker.

Update app.py:

from project import create_app, ext_celery

app = create_app()
celery = ext_celery.celery

Manual Test Run a worker in one terminal window:

(env)$ celery -A app.celery worker --loglevel=info -P gevent

[config]
.> app:         default:0x110bdea60 (.default.Loader)
.> transport:   redis://127.0.0.1:6379/0
.> results:     redis://127.0.0.1:6379/0
.> concurrency: 16 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . project.users.tasks.divide

Enter the Flask shell in a new terminal:

(env)$ FLASK_APP=app.py flask shell
Send some tasks to the Celery worker:

>>> from project.users.tasks import divide
>>> task = divide.delay(1, 2)
Back in the first terminal window, you should see the logs from the worker:

[2021-09-16 09:44:51,516: INFO/MainProcess] Task project.users.tasks.divide[566a69b5-ce41-496f-8939-a69cb2bdbf54] received
[2021-09-16 09:44:56,531: INFO/ForkPoolWorker-8] Task project.users.tasks.divide[566a69b5-ce41-496f-8939-a69cb2bdbf54] succeeded in 5.012977000907995s: 0.5

Last updated