Flask-CeleryExt

pip3 install Flask-CeleryExt
class BaseConfig:
    """Base configuration"""
    BASE_DIR = Path(__file__).parent.parent

    TESTING = False
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL', f'sqlite:///{BASE_DIR}/db.sqlite3')

    CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0")              # new
    CELERY_RESULT_BACKEND = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0")      # new

Create a new file called project/celery_utils.py:

from celery import current_app as current_celery_app


def make_celery(app):
    celery = current_celery_app
    celery.config_from_object(app.config, namespace="CELERY")

    return celery

Notes:

  • celery.config_from_object(app.config, namespace="CELERY") 指示所有与Celery相关的配置键都应该以大写形式编写,并以CELERY_为前缀。例如,要配置broker_url,您应该使用CELERY_BROKER_URL。

Update project/init.py to instantiate the FlaskCeleryExt extension:

Create a new file called project/users/tasks.py:

Notes:

  • web上的许多资源都推荐使用celery.task。在某些情况下,这可能会导致循环导入,因为您必须导入celery实例。我们使用shared_task使我们的代码可重用,这同样需要make_celery中的current_app,而不是创建一个新的celery实例。现在,我们可以在应用程序中的任何地方复制这个文件,它会像预期的那样工作。

Update project/users/init.py:

This will ensure that users/tasks.py will be loaded when Flask registers the blueprint and the tasks will be found by the Celery worker.

Update app.py:

Manual Test Run a worker in one terminal window:

Enter the Flask shell in a new terminal:

Last updated