Flask + Celery + Redis 异步发送邮件
页面
<html>
<head>
<title>Flask + Celery Examples</title>
</head>
<body>
<h1>Flask + Celery Examples</h1>
<h2>Example 1: Send Asynchronous Email</h2>
<div data-gb-custom-block data-tag="for">
<p style="color: red;">{{ message }}</p>
</div>
<form method="POST">
<p>Send test email to: <input type="text" name="email" value="{{ email }}"></p>
<input type="submit" name="submit" value="Send">
<input type="submit" name="submit" value="Send in 1 minute">
</form>
</body>
</html>
代码
import os
import random
import time
from flask import Flask, request, render_template, session, flash, redirect, \
url_for, jsonify
from flask_mail import Mail, Message
from celery import Celery
# windows 启动命令 :celery -A flask_celery:celery worker -l info -P eventlet
app = Flask(__name__)
app.config['SECRET_KEY'] = 'top-secret!'
# Flask-Mail configuration
app.config['MAIL_DEBUG'] = True # 开启debug,便于调试看信息
app.config['MAIL_SUPPRESS_SEND'] = False # 发送邮件,为True则不发送
app.config['MAIL_SERVER'] = 'smtp.qq.com' # 邮箱服务器
app.config['MAIL_PORT'] = 465 # 端口
app.config['MAIL_USE_SSL'] = True # 重要,qq邮箱需要使用SSL
app.config['MAIL_USE_TLS'] = False # 不需要使用TLS
app.config['MAIL_USERNAME'] = '' # 填邮箱
app.config['MAIL_PASSWORD'] = '' # 填授权码
app.config['MAIL_DEFAULT_SENDER'] = '' # 填邮箱,默认发送者
# Celery configuration
app.config['CELERY_BROKER_URL'] = 'redis://@182.92.111.11:6379/1'
app.config['CELERY_RESULT_BACKEND'] = 'redis://@182.92.111.11:6379/2'
# Initialize extensions
mail = Mail(app)
# Initialize Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def send_async_email(email_data):
"""Background task to send an email with Flask-Mail."""
msg = Message(email_data['subject'],
sender=app.config['MAIL_DEFAULT_SENDER'],
recipients=[email_data['to']])
msg.body = email_data['body']
with app.app_context():
mail.send(msg)
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'GET':
return render_template('falsk_email.html', email=session.get('email', ''))
email = request.form['email']
session['email'] = email
# send the email
email_data = {
'subject': 'Hello from Flask',
'to': email,
'body': 'This is a test email sent from a background Celery task.'
}
if request.form['submit'] == 'Send':
# send right away
send_async_email.delay(email_data)
flash('Sending email to {0}'.format(email))
else:
# send in one minute
send_async_email.apply_async(args=[email_data], countdown=120)
flash('An email will be sent to {0} in one minute'.format(email))
return redirect(url_for('index'))
if __name__ == '__main__':
app.run(debug=True)
Last updated