FastAPI 操作数据库

如果异步操作数据库的话:Async SQL (Relational) Databases - FastAPI

pip install sqlalchemy

pip install fastapi-async-sqlalchemy 支持异步

# coding: utf8
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker



SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

# 创建 SQLAlchemy 引擎, connect_args={"check_same_thread": False},启用多个线程, 仅用于SQLite,在其他数据库不需要它
engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)

# 创建SessionLocal类, 每个实例SessionLocal都会是一个数据库会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 创建一个Base类¶, 将用这个类继承,来创建每个数据库模型或类(ORM 模型)
Base = declarative_base()

Base.metadata.create_all(bind=engine)



# 创建数据库模型
from sqlalchemy import Boolean, Column, Integer, String


class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True,index=True)
    email = Column(String, unique=True,index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    # # 表示该表与其他表关联 例如: user.items 
    # items = relationship('Item', back_populates="owner") 


from fastapi import FastAPI
app = FastAPI()









# 创建初始 Pydantic模型/模式
from pydantic import BaseModel
from typing import Union

class UserBase(BaseModel):
    email: str

class UserCreate(UserBase):
    password: str

class Users(UserBase):
    id: int
    is_active: bool
    # items: list[Item] = []

    class Config:
        '''orm_mode 设置Pydantic模型与 ORM 兼容,表示返回对象模型'''
        orm_mode = True



# 操作数据库
from sqlalchemy.orm import Session
from fastapi import Depends, HTTPException,status

def get_user(db: Session, id: int):
    return db.query(User).filter(User.id == id).first()

def get_user_by_email(db: Session, email: str):
    return db.query(User).filter(User.email == email).first()

def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(User).offset(skip).limit(limit).all()

def create_users(db: Session, user: UserCreate ):
    hashed_password = user.password + "123"
    db_user = User(email=user.email, hashed_password=hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user) # 刷新数据库实例
    return db_user


def get_db():
    """
    每一个请求处理完毕后会关闭当前连接,不同的请求使用不同的连接
    :return:
    """
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post('/users', response_model=Users)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    return create_users(db = db, user=user)

@app.get('/users', response_model=Users)
def get_user(id: int, db: Session = Depends(get_db)):
    db_user = get_user(db, id)
    if not db_user:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
    return db_user

Last updated