Flask + Celery + Redis 异步发送邮件

页面

<html>
  <head>
    <title>Flask + Celery Examples</title>
  </head>
  <body>
    <h1>Flask + Celery Examples</h1>
    <h2>Example 1: Send Asynchronous Email</h2>
    <div data-gb-custom-block data-tag="for">
    <p style="color: red;">{{ message }}</p>
    </div>
    <form method="POST">
      <p>Send test email to: <input type="text" name="email" value="{{ email }}"></p>
      <input type="submit" name="submit" value="Send">
      <input type="submit" name="submit" value="Send in 1 minute">
    </form>
  </body>
</html>

代码

import os
import random
import time
from flask import Flask, request, render_template, session, flash, redirect, \
    url_for, jsonify
from flask_mail import Mail, Message
from celery import Celery

# windows 启动命令 :celery -A flask_celery:celery worker -l info -P eventlet

app = Flask(__name__)
app.config['SECRET_KEY'] = 'top-secret!'

# Flask-Mail configuration
app.config['MAIL_DEBUG'] = True             # 开启debug,便于调试看信息
app.config['MAIL_SUPPRESS_SEND'] = False    # 发送邮件,为True则不发送
app.config['MAIL_SERVER'] = 'smtp.qq.com'   # 邮箱服务器
app.config['MAIL_PORT'] = 465               # 端口
app.config['MAIL_USE_SSL'] = True           # 重要,qq邮箱需要使用SSL
app.config['MAIL_USE_TLS'] = False          # 不需要使用TLS
app.config['MAIL_USERNAME'] = ''  # 填邮箱
app.config['MAIL_PASSWORD'] = ''   # 填授权码
app.config['MAIL_DEFAULT_SENDER'] = ''  # 填邮箱,默认发送者

# Celery configuration
app.config['CELERY_BROKER_URL'] = 'redis://@182.92.111.11:6379/1'
app.config['CELERY_RESULT_BACKEND'] = 'redis://@182.92.111.11:6379/2'


# Initialize extensions
mail = Mail(app)

# Initialize Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)


@celery.task
def send_async_email(email_data):
    """Background task to send an email with Flask-Mail."""
    msg = Message(email_data['subject'],
                  sender=app.config['MAIL_DEFAULT_SENDER'],
                  recipients=[email_data['to']])
    msg.body = email_data['body']
    with app.app_context():
        mail.send(msg)

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('falsk_email.html', email=session.get('email', ''))
    email = request.form['email']
    session['email'] = email

    # send the email
    email_data = {
        'subject': 'Hello from Flask',
        'to': email,
        'body': 'This is a test email sent from a background Celery task.'
    }
    if request.form['submit'] == 'Send':
        # send right away
        send_async_email.delay(email_data)
        flash('Sending email to {0}'.format(email))
    else:
        # send in one minute
        send_async_email.apply_async(args=[email_data], countdown=120)
        flash('An email will be sent to {0} in one minute'.format(email))

    return redirect(url_for('index'))


if __name__ == '__main__':
    app.run(debug=True)

Last updated