FastAPI 操作数据库
如果异步操作数据库的话:Async SQL (Relational) Databases - FastAPI
pip install sqlalchemy
pip install fastapi-async-sqlalchemy 支持异步
# coding: utf8
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
# 创建 SQLAlchemy 引擎, connect_args={"check_same_thread": False},启用多个线程, 仅用于SQLite,在其他数据库不需要它
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
# 创建SessionLocal类, 每个实例SessionLocal都会是一个数据库会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 创建一个Base类¶, 将用这个类继承,来创建每个数据库模型或类(ORM 模型)
Base = declarative_base()
Base.metadata.create_all(bind=engine)
# 创建数据库模型
from sqlalchemy import Boolean, Column, Integer, String
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True,index=True)
email = Column(String, unique=True,index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
# # 表示该表与其他表关联 例如: user.items
# items = relationship('Item', back_populates="owner")
from fastapi import FastAPI
app = FastAPI()
# 创建初始 Pydantic模型/模式
from pydantic import BaseModel
from typing import Union
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class Users(UserBase):
id: int
is_active: bool
# items: list[Item] = []
class Config:
'''orm_mode 设置Pydantic模型与 ORM 兼容,表示返回对象模型'''
orm_mode = True
# 操作数据库
from sqlalchemy.orm import Session
from fastapi import Depends, HTTPException,status
def get_user(db: Session, id: int):
return db.query(User).filter(User.id == id).first()
def get_user_by_email(db: Session, email: str):
return db.query(User).filter(User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(User).offset(skip).limit(limit).all()
def create_users(db: Session, user: UserCreate ):
hashed_password = user.password + "123"
db_user = User(email=user.email, hashed_password=hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user) # 刷新数据库实例
return db_user
def get_db():
"""
每一个请求处理完毕后会关闭当前连接,不同的请求使用不同的连接
:return:
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post('/users', response_model=Users)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
return create_users(db = db, user=user)
@app.get('/users', response_model=Users)
def get_user(id: int, db: Session = Depends(get_db)):
db_user = get_user(db, id)
if not db_user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
return db_user
Last updated