安装
pip install flask-sqlalchemy
配置
连接数据库。示例:mysql://username:password@host/post/db?charset=utf-8
一个将会绑定多种数据库的字典。 更多详细信息请看官文 绑定多种数据库.
控制在连接池达到最大值后可以创建的连接数。当这些额外的 连接回收到连接池后将会被断开和抛弃。
SQLALCHEMY_TRACK_MODIFICATIONS
如果设置成 True (默认情况),Flask-SQLAlchemy 将会追踪对象的修改并且发送信号。这需要额外的内存, 如果不必要的可以禁用它。
常用的SQLAlchemy字段类型
变长Unicode字符串,对较长或不限长度的字符串做了优化
常用的SQLAlchemy列选项
如果为True,允许有空值,如果为False,不允许有空值
SQLAlchemy关系选项
决定了什么时候SQLALchemy从数据库中加载数据 (子查询方式(subquery), 动态方式(dynamic))
在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件
常用的SQLAlchemy查询过滤器
常用的SQLAlchemy查询执行器
返回一个Paginate对象,它包含指定范围内的结果
示例
操作数据库需要先创建一个db对象,通常写在exts.py
文件里。
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
flask项目一般将数据库配置写入configs.py
文件里面,配置在创建引擎前需写好,不要在程序运行时修改配置,如下。
HOST = '127.0.0.1'
PORT = '3306'
DATABASE = 'flask1'
USERNAME = 'root'
PASSWORD = '123456'
DB_URI = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD, host=HOST,port=PORT, db=DATABASE)
SQLALCHEMY_DATABASE_URI = DB_URI
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ECHO = True
写完数据库配置后需要和app绑定,app.py
文件里写flask应用的创建和蓝图的注册等等,如下:
from flask import Flask
import configs
from exts import db
app = Flask(__name__)
# 加载配置文件
app.config.from_object(configs)
# db绑定app
db.init_app(app)
1.表的创建
# 建表写在models.py文件里面
from ext import db
"""
以下表关系:
一个用户对应多篇文章(一对多)
一篇文章对应多个标签,一个标签对应多个文章(多对多)
"""
"""
relationship 理解: 关系关联
一对一关系中,需要设置 relationship 中的 uselist=Flase,其他数据库操作一样。
一对多关系中,外键设置在多的一方中,关系(relationship)可设置在任意一方。
多对多关系中,需建立关系表,设置 secondary=关系表
"""
# 用户表
class User(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(50))
email = db.Column(db.String(50))
# 关系表(多对多)
article_tag_table = db.Table('article_tag',
db.Column('article_id', db.Integer, db.ForeignKey('article.id'), primary_key=True),
db.Column('tag_id', db.Integer, db.ForeignKey('tag.id'), primary_key=True))
# 文章表
class Article(db.Model):
__tablename__ = 'article'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
title = db.Column(db.String(100))
content = db.Column(db.Text)
author_id = db.Column(db.Integer, db.ForeignKey('user.id'))
author = db.relationship("User", backref="articles")
tags = db.relationship("Tag", secondary=article_tag_table, backref='tags',lazy='dynamic')
# 标签表
class Tag(db.Model):
__tablename__ = 'tag'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(50))
2.表的映射
创建好表后需要映射到数据库中,这里需要用到flask-migrate
库。下面是启动文件manage.py
。
pip install flask==1.1.2 flask-migrate==2.7.0 flask-script
from flask_script import Manager, Server
from app import app
from flask_migrate import Migrate, MigrateCommand
from ext import db
from first import models # 模型文件必须导入进来,否则运行报错
manager = Manager(app)
# 第一个参数是Flask的实例,第二个参数是Sqlalchemy数据库实例
Migrate(app=app, db=db)
# manager是Flask-Script的实例,这条语句在flask-Script中添加一个db命令
manager.add_command('db', MigrateCommand) # 创建数据库映射命令
manager.add_command('start', Server(port=8000, use_debugger=True)) # 创建启动命令
if __name__ == '__main__':
manager.run()
配置好启动文件后,进入项目根目录,在命令行输入以下代码:
>python manage.py db init
>python manage.py db migrate
>python manage.py db upgrade
3.表的增删查改
# 原生sql语句操作
sql = 'select * from user'
result = db.session.execute(sql)
# 查询全部
User.query.all()
# 主键查询
User.query.get(1)
# 条件查询
User.query.filter_by(User.username='name')
# 多条件查询
from sqlalchemy import and_
User.query.filter_by(and_(User.username =='name',User.password=='passwd'))
# 比较查询
User.query.filter(User.id.__lt__(5)) # 小于5
User.query.filter(User.id.__le__(5)) # 小于等于5
User.query.filter(User.id.__gt__(5)) # 大于5
User.query.filter(User.id.__ge__(5)) # 大于等于5
# in查询
User.query.filter(User.username.in_('A','B','C','D'))
# 排序
User.query.order_by('age') # 按年龄排序,默认升序,在前面加-号为降序'-age'
# 限制查询
User.query.filter(age=18).offset(2).limit(3) # 跳过二条开始查询,限制输出3条
# 增加
use = User(id,username,password)
db.session.add(use)
db.session.commit()
# 删除
User.query.filter_by(User.username='name').delete()
# 修改
User.query.filter_by(User.username='name').update({'password':'newdata'})
补充
#这个命令会创建migrations文件夹,所有迁移文件都放在里面。
python manage.py db init
python manage.py db migrate -m 'initial migration'
python manage.py db upgrade
# 返回以前的版本, 可以根据history命令找到版本号,然后传给downgrade命令:
python manage.py db history
输出格式:<base> -> 版本号 (head), initial migration
# 返回以前的版本
python manage.py db downgrade 版本号
实际操作顺序:
2.python 文件 db migrate -m"版本名(注释)"
3.python 文件 db upgrade 然后观察表结构
5.python 文件 db migrate -m"新版本名(注释)"
6.python 文件 db upgrade 然后观察表结构
7.若返回版本,则利用 python 文件 db history查看版本号
8.python 文件 db downgrade(upgrade) 版本号
一对多查询
from flask import Flask, render_template, request, flash, redirect
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__,static_folder="static",template_folder="templates")
# 设置数据库连接属性
app.config['SQLALCHEMY_DATABASE_URI'] = '×××'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 实例化 ORM 操作对象
db = SQLAlchemy(app)
# 班级表
class Classes(db.Model):
__tablename__ = "classes"
id = db.Column(db.Integer,primary_key=True)
name = db.Column(db.String(20),nullable=False,unique=True)
# 描述了Students和Classes的关系, 一对多当中,放在 一 中
relate_student = db.relationship("Students",backref='relate_class',lazy='dynamic')
# 学生表
class Students(db.Model):
__tablename__ = "students"
id = db.Column(db.Integer,primary_key=True)
name = db.Column(db.String(40),nullable=False)
cls_id = db.Column(db.Integer,db.ForeignKey("classes.id")) # 注意要写成(表名.字段名)
# 如果知道学生的姓名,想知道班级的名称,可以这样查:
stu = Students.query.filter(Students.name == 'xxx').first()
stu.relate_class.name # stu.relate_class 会跳到 classes 表
# 如果知道班级的名称,想返回全部学生的名字的列表,可以这样查:
cls = Classes.query.filter(Classes.name == 'xxx').first()
cls.relate_student.name # cls.relate_stu 会跳到 students 表
多对多查询
tb_student_course = db.Table('tb_student_course',
db.Column('student_id', db.Integer, db.ForeignKey('students.id')),
db.Column('course_id', db.Integer, db.ForeignKey('courses.id'))
)
class Student(db.Model):
__tablename__ = "students"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
# 关联属性,多对多的情况,可以写在任意一个模型类中
relate_courses = db.relationship('Course', secondary=tb_student_course,
backref='relate_student',
lazy='dynamic')
class Course(db.Model):
__tablename__ = "courses"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
添加测试数据:
# 添加测试数据
stu1 = Student(name='张三')
stu2 = Student(name='李四')
stu3 = Student(name='王五')
cou1 = Course(name='物理')
cou2 = Course(name='化学')
cou3 = Course(name='生物')
stu1.courses = [cou2, cou3] # 记得要添加关系
stu2.courses = [cou2]
stu3.courses = [cou1, cou2, cou3]
db.session.add_all([stu1, stu2, stu2])
db.session.add_all([cou1, cou2, cou3])
db.session.commit()
要查某个学生修的全部课程,修了某个课程的全部学生:
for course in stu1.relate_courses:
print(course.name)
for student in cou2.relate_student:
print(student)