Flask-SQLAlchemy详解
安装
pip install flask-sqlalchemy
配置
SQLALCHEMY_DATABASE_URI
连接数据库。示例:mysql://username:password@host/post/db?charset=utf-8
SQLALCHEMY_BINDS
一个将会绑定多种数据库的字典。 更多详细信息请看官文 绑定多种数据库.
SQLALCHEMY_ECHO
调试设置为true
SQLALCHEMY_POOL_SIZE
数据库池的大小,默认值为5。
SQLALCHEMY_POOL_TIMEOUT
连接超时时间
SQLALCHEMY_POOL_RECYCLE
自动回收连接的秒数。
SQLALCHEMY_MAX_OVERFLOW
控制在连接池达到最大值后可以创建的连接数。当这些额外的 连接回收到连接池后将会被断开和抛弃。
SQLALCHEMY_TRACK_MODIFICATIONS
如果设置成 True (默认情况),Flask-SQLAlchemy 将会追踪对象的修改并且发送信号。这需要额外的内存, 如果不必要的可以禁用它。
常用的SQLAlchemy字段类型
Integer
int
普通整数,一般是32位
SmallInteger
int
取值范围小的整数,一般是16位
BigInteger
int或long
不限制精度的整数
Float
float
浮点数
Numeric
decimal.Decimal
普通整数,一般是32位
String
str
变长字符串
Text
str
变长字符串,对较长或不限长度的字符串做了优化
Unicode
unicode
变长Unicode字符串
UnicodeText
unicode
变长Unicode字符串,对较长或不限长度的字符串做了优化
Boolean
bool
布尔值
Date
datetime.date
时间
Time
datetime.datetime
日期和时间
LargeBinary
str
巨长度二进制数据
常用的SQLAlchemy列选项
primary_key
如果为True,代表表的主键
unique
如果为True,代表这列不允许出现重复的值
index
如果为True,为这列创建索引,提高查询效率
nullable
如果为True,允许有空值,如果为False,不允许有空值
default
为这列定义默认值
SQLAlchemy关系选项
backref
在关系的另一模型中添加反向引用
lazy
决定了什么时候SQLALchemy从数据库中加载数据 (子查询方式(subquery), 动态方式(dynamic))
primary join
明确指定两个模型之间使用的联结条件
uselist
如果为False,不使用列表,而使用标量值
order_by
指定关系中记录的排序方式
secondary
指定多对多关系中关系表的名字
secondary join
在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件
常用的SQLAlchemy查询过滤器
filter()
把过滤器添加到原查询上,返回一个新查询
filter_by()
把等值过滤器添加到原查询上,返回一个新查询
limit
使用指定的值限定原查询返回的结果
offset()
偏移原查询返回的结果,返回一个新查询
order_by()
根据指定条件对原查询结果进行排序,返回一个新查询
group_by()
根据指定条件对原查询结果进行分组,返回一个新查询
常用的SQLAlchemy查询执行器
all()
以列表形式返回查询的所有结果
first()
返回查询的第一个结果,如果未查到,返回None
first_or_404()
返回查询的第一个结果,如果未查到,返回404
get()
返回指定主键对应的行,如不存在,返回None
get_or_404()
返回指定主键对应的行,如不存在,返回404
count()
返回查询结果的数量
paginate()
返回一个Paginate对象,它包含指定范围内的结果
示例
操作数据库需要先创建一个db对象,通常写在exts.py
文件里。
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
flask项目一般将数据库配置写入configs.py
文件里面,配置在创建引擎前需写好,不要在程序运行时修改配置,如下。
HOST = '127.0.0.1'
PORT = '3306'
DATABASE = 'flask1'
USERNAME = 'root'
PASSWORD = '123456'
DB_URI = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD, host=HOST,port=PORT, db=DATABASE)
SQLALCHEMY_DATABASE_URI = DB_URI
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ECHO = True
写完数据库配置后需要和app绑定,app.py
文件里写flask应用的创建和蓝图的注册等等,如下:
from flask import Flask
import configs
from exts import db
app = Flask(__name__)
# 加载配置文件
app.config.from_object(configs)
# db绑定app
db.init_app(app)
1.表的创建
# 建表写在models.py文件里面
from ext import db
"""
以下表关系:
一个用户对应多篇文章(一对多)
一篇文章对应多个标签,一个标签对应多个文章(多对多)
"""
"""
relationship 理解: 关系关联
一对一关系中,需要设置 relationship 中的 uselist=Flase,其他数据库操作一样。
一对多关系中,外键设置在多的一方中,关系(relationship)可设置在任意一方。
多对多关系中,需建立关系表,设置 secondary=关系表
"""
# 用户表
class User(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(50))
email = db.Column(db.String(50))
# 关系表(多对多)
article_tag_table = db.Table('article_tag',
db.Column('article_id', db.Integer, db.ForeignKey('article.id'), primary_key=True),
db.Column('tag_id', db.Integer, db.ForeignKey('tag.id'), primary_key=True))
# 文章表
class Article(db.Model):
__tablename__ = 'article'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
title = db.Column(db.String(100))
content = db.Column(db.Text)
author_id = db.Column(db.Integer, db.ForeignKey('user.id'))
author = db.relationship("User", backref="articles")
tags = db.relationship("Tag", secondary=article_tag_table, backref='tags',lazy='dynamic')
# 标签表
class Tag(db.Model):
__tablename__ = 'tag'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(50))
2.表的映射
创建好表后需要映射到数据库中,这里需要用到flask-migrate
库。下面是启动文件manage.py
。
pip install flask==1.1.2 flask-migrate==2.7.0 flask-script
from flask_script import Manager, Server
from app import app
from flask_migrate import Migrate, MigrateCommand
from ext import db
from first import models # 模型文件必须导入进来,否则运行报错
manager = Manager(app)
# 第一个参数是Flask的实例,第二个参数是Sqlalchemy数据库实例
Migrate(app=app, db=db)
# manager是Flask-Script的实例,这条语句在flask-Script中添加一个db命令
manager.add_command('db', MigrateCommand) # 创建数据库映射命令
manager.add_command('start', Server(port=8000, use_debugger=True)) # 创建启动命令
if __name__ == '__main__':
manager.run()
配置好启动文件后,进入项目根目录,在命令行输入以下代码:
>python manage.py db init
>python manage.py db migrate
>python manage.py db upgrade
3.表的增删查改
# 原生sql语句操作
sql = 'select * from user'
result = db.session.execute(sql)
# 查询全部
User.query.all()
# 主键查询
User.query.get(1)
# 条件查询
User.query.filter_by(User.username='name')
# 多条件查询
from sqlalchemy import and_
User.query.filter_by(and_(User.username =='name',User.password=='passwd'))
# 比较查询
User.query.filter(User.id.__lt__(5)) # 小于5
User.query.filter(User.id.__le__(5)) # 小于等于5
User.query.filter(User.id.__gt__(5)) # 大于5
User.query.filter(User.id.__ge__(5)) # 大于等于5
# in查询
User.query.filter(User.username.in_('A','B','C','D'))
# 排序
User.query.order_by('age') # 按年龄排序,默认升序,在前面加-号为降序'-age'
# 限制查询
User.query.filter(age=18).offset(2).limit(3) # 跳过二条开始查询,限制输出3条
# 增加
use = User(id,username,password)
db.session.add(use)
db.session.commit()
# 删除
User.query.filter_by(User.username='name').delete()
# 修改
User.query.filter_by(User.username='name').update({'password':'newdata'})
补充
#这个命令会创建migrations文件夹,所有迁移文件都放在里面。
python manage.py db init
python manage.py db migrate -m 'initial migration'
python manage.py db upgrade
# 返回以前的版本, 可以根据history命令找到版本号,然后传给downgrade命令:
python manage.py db history
输出格式:<base> -> 版本号 (head), initial migration
# 返回以前的版本
python manage.py db downgrade 版本号
实际操作顺序:
1.python 文件 db init
2.python 文件 db migrate -m"版本名(注释)"
3.python 文件 db upgrade 然后观察表结构
4.根据需求修改模型
5.python 文件 db migrate -m"新版本名(注释)"
6.python 文件 db upgrade 然后观察表结构
7.若返回版本,则利用 python 文件 db history查看版本号
8.python 文件 db downgrade(upgrade) 版本号
一对多查询
from flask import Flask, render_template, request, flash, redirect
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__,static_folder="static",template_folder="templates")
# 设置数据库连接属性
app.config['SQLALCHEMY_DATABASE_URI'] = '×××'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 实例化 ORM 操作对象
db = SQLAlchemy(app)
# 班级表
class Classes(db.Model):
__tablename__ = "classes"
id = db.Column(db.Integer,primary_key=True)
name = db.Column(db.String(20),nullable=False,unique=True)
# 描述了Students和Classes的关系, 一对多当中,放在 一 中
relate_student = db.relationship("Students",backref='relate_class',lazy='dynamic')
# 学生表
class Students(db.Model):
__tablename__ = "students"
id = db.Column(db.Integer,primary_key=True)
name = db.Column(db.String(40),nullable=False)
cls_id = db.Column(db.Integer,db.ForeignKey("classes.id")) # 注意要写成(表名.字段名)
# 如果知道学生的姓名,想知道班级的名称,可以这样查:
stu = Students.query.filter(Students.name == 'xxx').first()
stu.relate_class.name # stu.relate_class 会跳到 classes 表
# 如果知道班级的名称,想返回全部学生的名字的列表,可以这样查:
cls = Classes.query.filter(Classes.name == 'xxx').first()
cls.relate_student.name # cls.relate_stu 会跳到 students 表
多对多查询
tb_student_course = db.Table('tb_student_course',
db.Column('student_id', db.Integer, db.ForeignKey('students.id')),
db.Column('course_id', db.Integer, db.ForeignKey('courses.id'))
)
class Student(db.Model):
__tablename__ = "students"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
# 关联属性,多对多的情况,可以写在任意一个模型类中
relate_courses = db.relationship('Course', secondary=tb_student_course,
backref='relate_student',
lazy='dynamic')
class Course(db.Model):
__tablename__ = "courses"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
添加测试数据:
# 添加测试数据
stu1 = Student(name='张三')
stu2 = Student(name='李四')
stu3 = Student(name='王五')
cou1 = Course(name='物理')
cou2 = Course(name='化学')
cou3 = Course(name='生物')
stu1.courses = [cou2, cou3] # 记得要添加关系
stu2.courses = [cou2]
stu3.courses = [cou1, cou2, cou3]
db.session.add_all([stu1, stu2, stu2])
db.session.add_all([cou1, cou2, cou3])
db.session.commit()
要查某个学生修的全部课程,修了某个课程的全部学生:
for course in stu1.relate_courses:
print(course.name)
for student in cou2.relate_student:
print(student)
Last updated