Flask-CeleryExt
参考:
项目: https://testdriven.io/courses/flask-celery/app-factory/#H-6-add-celery
pip3 install Flask-CeleryExt
class BaseConfig:
"""Base configuration"""
BASE_DIR = Path(__file__).parent.parent
TESTING = False
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL', f'sqlite:///{BASE_DIR}/db.sqlite3')
CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0") # new
CELERY_RESULT_BACKEND = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0") # new
Create a new file called project/celery_utils.py:
from celery import current_app as current_celery_app
def make_celery(app):
celery = current_celery_app
celery.config_from_object(app.config, namespace="CELERY")
return celery
Notes:
celery.config_from_object(app.config, namespace="CELERY") 指示所有与Celery相关的配置键都应该以大写形式编写,并以CELERY_为前缀。例如,要配置broker_url,您应该使用CELERY_BROKER_URL。
Update project/init.py to instantiate the FlaskCeleryExt extension:
import os
from flask import Flask
from flask_celeryext import FlaskCeleryExt # new
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from project.celery_utils import make_celery # new
from project.config import config
# instantiate the extensions
db = SQLAlchemy()
migrate = Migrate()
ext_celery = FlaskCeleryExt(create_celery_app=make_celery) # new
def create_app(config_name=None):
if config_name is None:
config_name = os.environ.get("FLASK_CONFIG", "development")
# instantiate the app
app = Flask(__name__)
# set config
app.config.from_object(config[config_name])
# set up extensions
db.init_app(app)
migrate.init_app(app, db)
ext_celery.init_app(app) # new
# register blueprints
from project.users import users_blueprint
app.register_blueprint(users_blueprint)
# shell context for flask cli
@app.shell_context_processor
def ctx():
return {"app": app, "db": db}
return app
Create a new file called project/users/tasks.py:
from celery import shared_task
@shared_task
def divide(x, y):
import time
time.sleep(5)
return x / y
Notes:
web上的许多资源都推荐使用celery.task。在某些情况下,这可能会导致循环导入,因为您必须导入celery实例。我们使用shared_task使我们的代码可重用,这同样需要make_celery中的current_app,而不是创建一个新的celery实例。现在,我们可以在应用程序中的任何地方复制这个文件,它会像预期的那样工作。
Update project/users/init.py:
from flask import Blueprint
users_blueprint = Blueprint("users", __name__, url_prefix="/users", template_folder="templates")
from . import models, tasks # noqa
This will ensure that users/tasks.py will be loaded when Flask registers the blueprint and the tasks will be found by the Celery worker.
Update app.py:
from project import create_app, ext_celery
app = create_app()
celery = ext_celery.celery
Manual Test Run a worker in one terminal window:
(env)$ celery -A app.celery worker --loglevel=info -P gevent
[config]
.> app: default:0x110bdea60 (.default.Loader)
.> transport: redis://127.0.0.1:6379/0
.> results: redis://127.0.0.1:6379/0
.> concurrency: 16 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. project.users.tasks.divide
Enter the Flask shell in a new terminal:
(env)$ FLASK_APP=app.py flask shell
Send some tasks to the Celery worker:
>>> from project.users.tasks import divide
>>> task = divide.delay(1, 2)
Back in the first terminal window, you should see the logs from the worker:
[2021-09-16 09:44:51,516: INFO/MainProcess] Task project.users.tasks.divide[566a69b5-ce41-496f-8939-a69cb2bdbf54] received
[2021-09-16 09:44:56,531: INFO/ForkPoolWorker-8] Task project.users.tasks.divide[566a69b5-ce41-496f-8939-a69cb2bdbf54] succeeded in 5.012977000907995s: 0.5
Last updated