1.Tortoise ORM 在项目中的配置
1.模型定义
from tortoise import fields
from tortoise.models import Model
class DateTimeModel(Model):
created_at = fields.DatetimeField(auto_now_add=True, description="创建时间")
updated_at = fields.DatetimeField(auto_now=True, null=True, description="更新时间")
class Meta:
abstract = True
class GroupModel(DateTimeModel):
uuid = fields.UUIDField(source_field="uuid", pk=True, unique=True, description="用户组唯一标识")
name = fields.CharField(source_field="username", max_length=64, description="用户组名称")
parent_uuid = fields.CharField(source_field="parent_uuid", max_length=32, null=True)
users: fields.ReverseRelation["UserModel"]
class Meta:
table = "group"
class UserModel(DateTimeModel):
uuid = fields.UUIDField(source_field="uuid", pk=True, unique=True, description="用户唯一标识")
name = fields.CharField(source_field="username", max_length=64, description="用户展示名称,可以修改")
password = fields.CharField(source_field="password", max_length=128, description="用户密码")
status = fields.CharEnumField(UserStatus, default=UserStatus.OFFLINE, description="用户登录状态")
# 一对多,关联管系,和Django没什大的区别
group_uuid: fields.ForeignKeyRelation["GroupModel"] = \
fields.ForeignKeyField("models.GroupModel", related_name="group", on_delete=fields.CASCADE)
class Meta:
table = "user"
ordering = ["created_at"]
class PydanticMeta:
# 该字段不做展示,过滤作用
exclude = ["password"]
2.数据库配置
from typing import Type
from tortoise import Model
class Router:
@staticmethod
def [db_for_read]()(model: Type[Model]):
return "slave"
@staticmethod
def db_for_write(model: Type[Model]):
return "master"
import urllib.parse
urllib.parse.quote_plus("kx%jj5/g")
TORTOISE_ORM = {
'connections': {
'master': {
# 'engine': 'tortoise.backends.asyncpg', PostgreSQL
'engine': 'tortoise.backends.mysql', # MySQL or Mariadb
'credentials': {
'host': '127.0.0.1',
'port': '3306',
'user': 'root',
'password': 'password',
'database': 'base',
'minsize': 1,
'maxsize': 5,
'charset': 'utf8mb4',
'echo': True
}
},
'slave': {
'engine': 'tortoise.backends.mysql',
'credentials': {
'host': '127.0.0.1',
'port': '3306',
'user': 'root',
'password': 'password',
'database': 'base',
'minsize': 1,
'maxsize': 5,
'charset': 'utf8mb4',
'echo': True
}
},
},
'apps': {
'models': {
'models': ['models', "aerich.models"],
'default_connection': 'master',
}
},
'routers': ["Router"],
'use_tz': False,
'timezone': 'Asia/Shanghai'
}
3.fastapi中引入
import uvicorn
from fastapi import FastAPI
from tortoise.contrib.fastapi import register_tortoise
app = FastAPI()
# 该方法会在fastapi启动时触发,内部通过传递进去的app对象,监听服务启动和终止事件
# 当检测到启动事件时,会初始化Tortoise对象,如果generate_schemas为True则还会进行数据库迁移
# 当检测到终止事件时,会关闭连接
register_tortoise(
app,
config=TORTOISE_ORM,
# generate_schemas=True, # 如果数据库为空,则自动生成对应表单,生产环境不要开
# add_exception_handlers=True, # 生产环境不要开,会泄露调试信息
)
if __name__ == '__main__':
uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
debug=True, workers=1)
4.通过aerich
这个模块实现数据库的迁移,类似于django的makemigrations
和migrate
功能
aerich init -t TORTOISE_ORM # 根据自己文件实际路径写
初始化完会在当前目录生成一个文件pyproject.toml
和一个空文件夹migrations
pyproject.toml
:保存配置文件路径,低版本可能是aerich.ini
如果TORTOISE_ORM
配置文件中的models
改了名,则执行这条命令时需要增加--app
参数,来指定你修改的名字
生成migrstions/models/0_xxxx_init.py文件夹
aerich migrate --name drop_column
迁移文件名的格式为 {version_num}{datetime}{name|update}.json。
1_202029051520102929_drop_column.json
如果aerich猜到您正在重命名列,它会要求重命名{old_column}为{new_column} [True],您可以选择True重命名列而不删除列,或者选择False删除列然后创建,如果使用MySQL,只有MySQL8.0+支持重命名。到语法
aerich downgrade # 默认回退一级
5.main.py主程序中进行注册
from tortoise.contrib.fastapi import register_tortoise
app = FastAPI()
@app.on_event("startup")
async def startup_event():
# generate_schemas=True 如果数据库为空,则自动生成对应表单,生产环境不要开
# add_exception_handlers=True 生产环境不要开,会泄露调试信息
register_tortoise(app, config=TORTOISE_ORM, generate_schemas=False, add_exception_handlers=False)
6.创建测试模型
from tortoise import Model
from tortoise import fields
class UserModel(DateTimeModel):
"""用户表"""
uuid = fields.UUIDField(source_field="uuid", pk=True, unique=True, description="用户唯一标识")
name = fields.CharField(source_field="name", max_length=30, description="用户名称")
age = fields.IntField(source_field="age", description="用户年龄")
password = fields.CharField(source_field="password", max_length=50, description="用户密码")
class Meta:
table = "user" # 表名
# abstract=True # 是否为抽象模型,用于继承,且不会在数据库中生成数据表
unique_together=(("name", "password"), ) # 联合约束
table_description = "用户表" # 数据库对该表的注释
indexes=(("name", "password"),) # 联合索引
ordering = ["xxx", "-xxx"] # 设置默认查询结果的顺序,-代表降序
2.Tortoise ORM 增删改查
1.查询
1.使用 await
和 不使用 await
的区别
1.使用 await
查询时,得到的结果是Model对象,可以直接获取其属性
objs = await UserModel.filter(uuid__in=apps_uuid)
# {VirtualAppModel} <VirtualAppModel>
print(objs[0].name)
* 2.不适用 `await` 查询时,得到的是`tortoise.queryset.QuerySet`对象,无法通过循环遍历获取每一个对象的具体属性,但是支持链式查询,适合在进行`分页`时使用。
objs = UserModel.filter(uuid__in=apps_uuid)
# obj: <tortoise.queryset.QuerySet object at 0x00000132C4EBF160>
objs = objs.filter(name=xxx)
objs = objs.filter(xxx=xxxx)
result = await UserModel.exists(uuid=xxx)
user = await UserModel.get(uuid=xxx) # <UserModel>
# 1.如果查询到多条数据,则抛出异常:tortoise.exceptions.MultipleObjectsReturned
# 2.如果没找到数据,则抛出异常:tortoise.exceptions.DoesNotExist
user = await UserModel.get_or_none(uuid=xxx) # <UserModel> or None
# 1.如果没找到数据,返回:None
# 2.如果查询到多条数据,则抛出异常:tortoise.exceptions.MultipleObjectsReturned
users = await UserModel.filter(name=xxx) # [<UserModel>]
users = await UserModel.all() # [<UserModel>, <UserModel>, ...]
data = await UserModel.first() # [<UserModel>]
data_dict = await UserModel.first().values("name", "uuid")
# 如果查询结果是单条数据:{'name': '222224', 'uuid': '35f01c8a57aa44008c99682f0eece37a'}
# 如果查询结果是多条数据:[{'name': 'xxx', 'uuid': 'xxx'}, {'name': 'xxx', 'uuid': 'xxx'}]
data_tuple = await UserModel.first().values_list("name", "uuid")
# 元组形式,只返回值:('222224', '35f01c8a57aa44008c99682f0eece37a')
# 多条数据:[('222224', '35f01c8a57aa44008c99682f0eece37a'), ('xxx', 'xxx')]
# 如果相获取多条数据中的某一个字段,比如uuid,正常情况下返回:[('xxx',), ('xxx'),...]
# 但如果想要的结果为:['xxx', 'xxx', 'xxx', ...],就需要另一个参数了
uuid_list = await UserModel.filter(name__startswith='xxx').values_list("uuid", flat=True)
# 如果想获取部分字段,但是以object对象形式返回
data = await VirtualAppModel.first().only("name")
# 如果查询结果是单条数据:<UserModel>,不过这个模型对象中只有name属性,强行获取其他属性,则会报错
# 如果查询结果是多条数据:[<UserModel>]
8.不等于:exclude,比如查询name不等于111的用户
data = await UserModel.exclude(name='111')
data = await UserModel.filter(name='111').distinct()
num = await UserModel.filter(name='test').count()
# 或者
queryset = UserModel.filter(name='test')
num = await queryset.count()
data = await queryset.all()
from tortoise.functions import Count, Trim, Lower, Upper, Coalesce
await Tournament.annotate(events_count=Count('events')).filter(events_count__gte=10)
await Tournament.annotate(clean_name=Trim('name')).filter(clean_name='tournament')
await Tournament.annotate(name_upper=Upper('name')).filter(name_upper='TOURNAMENT')
await Tournament.annotate(name_lower=Lower('name')).filter(name_lower='tournament')
await Tournament.annotate(desc_clean=Coalesce('desc', '')).filter(desc_clean='')
12.双下划线查询:根据字段值进行过滤
name__not_in=['xxx', 'xxx']
# 1.列表形式
# extra字段值:["text", 3, {"msg": "msg2"}]
obj = await UserModel.filter(extra__contains=[{"msg": "msg2"}]).first()
# 2.字典形式
# extra字段值:{"breed": "labrador", owner": {"name": "Boby", "last": None, other_pets": [{"name": "Fishy"}]}}
# 2.1根据字段进行精确匹配(可以使用上述双下划线查询,比如:name__not进行不等于查询)
obj1 = await UserModel.filter(extra__filter={"breed": "labrador"}).first()
# 2.2嵌套字典数据获取
obj2 = await UserModel.filter(extra__filter={"owner__name": "Boby"}).first()
# 2.3获取嵌套数据中的列表数据
obj3 = await UserModel.filter(data__filter={"owner__other_pets__0__name": "Fishy"}).first()
from tortoise.expressions import Q
users = await UserModel.filter(Q(name='a') | Q(name='b'))
# 等效于:如果省略join_type,则为AND
users = await UserModel.filter(Q(Q(name='a'), Q(name='b'), join_type="OR"))
# ~Q:不等于
users = await UserModel.filter(~Q(name='a'))
from tortoise.expressions import F
await UserModel.filter(uuid='xxx').update(age=F('age')-10)
2.创建
user = await UserModel.create(uuid="xxx", name="xxx") # <UserModel>
# 或者
user = User(uuid="xxx", name="xxx")
await user.save()
bulk_data = list()
for uuid in users_uuid:
bulk_data.append(await UserModel(uuid=uuid, name="xxx"))
await UserModel.bulk_create(bulk_data)
3.查询或创建(如果查询得到,则返回该对象,否则直接创建)
# 根据名称查询,查询不到,则创建一条数据:name:xxx,age: 12
user = await UserModel.get_or_create(name='xxx', defaults={'age': 12})
3.更新
# 返回更新的数目:int
num = await UserModel.filter(uuid='xxx').update(age=24)
# 或者
user = await UserModel.get(uuid='xxx')
user.age = F('age') + 10
await user.save(update_fields=['balance'])
# 根据name=111查询,如果有数据,则更新age=26,password=password;否则,创建一条数据,name='11111'、age=26、password=password
# 返回结果为元组,第一个元素为Model对象,第二个元素为是否新创建的数据
data, is_create = await UserModel.update_or_create(name="11111", defaults={"age": 26, "password": "password"})
4.删除
# 删除的条目数
num = await UserModel.all().delete()
# 删除的条目数
num = await UserModel.filter(uuid='xxx').delete()