Update project/init.py to instantiate the FlaskCeleryExt extension:
import os
from flask import Flask
from flask_celeryext import FlaskCeleryExt # new
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from project.celery_utils import make_celery # new
from project.config import config
# instantiate the extensions
db = SQLAlchemy()
migrate = Migrate()
ext_celery = FlaskCeleryExt(create_celery_app=make_celery) # new
def create_app(config_name=None):
if config_name is None:
config_name = os.environ.get("FLASK_CONFIG", "development")
# instantiate the app
app = Flask(__name__)
# set config
app.config.from_object(config[config_name])
# set up extensions
db.init_app(app)
migrate.init_app(app, db)
ext_celery.init_app(app) # new
# register blueprints
from project.users import users_blueprint
app.register_blueprint(users_blueprint)
# shell context for flask cli
@app.shell_context_processor
def ctx():
return {"app": app, "db": db}
return app
Create a new file called project/users/tasks.py:
from celery import shared_task
@shared_task
def divide(x, y):
import time
time.sleep(5)
return x / y
(env)$ celery -A app.celery worker --loglevel=info -P gevent
[config]
.> app: default:0x110bdea60 (.default.Loader)
.> transport: redis://127.0.0.1:6379/0
.> results: redis://127.0.0.1:6379/0
.> concurrency: 16 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. project.users.tasks.divide
Enter the Flask shell in a new terminal:
(env)$ FLASK_APP=app.py flask shell
Send some tasks to the Celery worker:
>>> from project.users.tasks import divide
>>> task = divide.delay(1, 2)
Back in the first terminal window, you should see the logs from the worker:
[2021-09-16 09:44:51,516: INFO/MainProcess] Task project.users.tasks.divide[566a69b5-ce41-496f-8939-a69cb2bdbf54] received
[2021-09-16 09:44:56,531: INFO/ForkPoolWorker-8] Task project.users.tasks.divide[566a69b5-ce41-496f-8939-a69cb2bdbf54] succeeded in 5.012977000907995s: 0.5