FastAPI与SQLAlchemy 2.0完全指南

📂 所属阶段:第三阶段 — 数据持久化(数据库篇)
🔗 相关章节:async-await-principles · redis-integration · database-migration-alembic

本教程将手把手带你搭建一个基于 FastAPI + SQLAlchemy 2.0 的异步数据库访问层。我们会从environment-setup、配置管理一直走到实战 CRUD 与事务处理,同时避开常见的异步陷阱。即使你是第一次接触 ORM,也能跟着代码一步步跑起来。

1. 为什么选择 SQLAlchemy 2.0?

SQLAlchemy 2.0 对异步的支持不再是“补丁式”的,而是原生集成的,这让它在异步 Web 框架中如虎添翼。

特性说明
原生异步AsyncSession + asyncpg/aiosqlite,无需再拿线程池包裹同步连接
统一查询Core 和 ORM 共用 select() API,学习曲线明显下降
强类型注解Mapped[int] 等能力让你在 IDE 中就能感知字段类型,减少低级错误
企业级基础完善的连接池、事务、并发控制,可以直接用在生产环境

安装依赖

# 异步核心 + 数据库驱动(这里以 PostgreSQL 为例)
pip install fastapi sqlalchemy[asyncio] asyncpg aiosqlite pydantic-settings passlib[bcrypt]

💡 如果你用 MySQL,可以将 asyncpg 替换为 aiomysql,并将连接字符串改为 mysql+aiomysql://


2. 基础架构:从配置到引擎

好的项目结构可以让维护事半功倍。我们先把数据库配置、引擎、会话工厂统一管理起来。

2.1 配置管理

创建 .env 文件存放敏感信息,再用 pydantic-settings 加载,方便在不同环境切换。

# config.py
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    database_url: str = "postgresql+asyncpg://user:pass@localhost/fastapi_db"
    debug: bool = False          # 开发时可打开,打印 SQL
    pool_size: int = 20          # 连接池常驻连接数
    max_overflow: int = 10       # 超出 pool_size 时最多再创建的连接数
    pool_pre_ping: bool = True   # 每次从池中取连接时先测试连通性

    class Config:
        env_file = ".env"

@lru_cache()
def get_settings() -> Settings:
    return Settings()

2.2 异步引擎和会话

下面这段是异步数据库访问的核心:引擎负责管理连接池,会话工厂则为每个请求生成独立的 AsyncSession

# database.py
from sqlalchemy.ext.asyncio import (
    AsyncSession, create_async_engine, async_sessionmaker
)
from sqlalchemy.orm import DeclarativeBase
from config import get_settings

settings = get_settings()

# 创建异步引擎
engine = create_async_engine(
    settings.database_url,
    echo=settings.debug,           # True 时会在终端打印每一条 SQL
    pool_size=settings.pool_size,
    max_overflow=settings.max_overflow,
    pool_pre_ping=settings.pool_pre_ping
)

# 异步会话工厂
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,        # 提交后对象不会过期,可以在后续继续使用
    autocommit=False,
    autoflush=False                # 手动控制 flush,避免意外行为
)

# 所有模型继承的基类
class Base(DeclarativeBase):
    pass

# FastAPI 依赖:为每个请求提供独立的数据库会话
async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        yield session

2.3 集成到 FastAPI 生命周期

利用 lifespan 启动时建表,关闭时释放连接池。生产环境推荐使用 Alembic 管理表结构迁移。

# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from database import engine, Base

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动:创建所有表(生产环境请用 Alembic 脚本替代)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    # 关闭:释放连接池资源
    await engine.dispose()

app = FastAPI(lifespan=lifespan)

3. 模型定义:拥抱 2.0 风格

SQLAlchemy 2.0 推荐使用 Mapped[T]mapped_column 的写法,既清晰又便于类型检查。

我们以 用户 – 帖子 – 标签 三个模型为例,展示一对多、多对多的关系配置。

# models.py
from sqlalchemy import (
    String, Integer, Boolean, DateTime, Text, ForeignKey, Table, Column
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from database import Base
from datetime import datetime
from typing import List, TYPE_CHECKING

if TYPE_CHECKING:
    from models import Post, Comment

# 多对多关联表(帖子 ↔ 标签)
post_tags = Table(
    "post_tags",
    Base.metadata,
    Column("post_id", Integer, ForeignKey("posts.id", ondelete="CASCADE"), primary_key=True),
    Column("tag_id", Integer, ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True)
)

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String(50), nullable=False)
    email: Mapped[str] = mapped_column(String(100), unique=True, index=True, nullable=False)
    hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), onupdate=func.now())

    # 一对多:一个用户拥有多篇文章
    posts: Mapped[List["Post"]] = relationship(
        "Post", back_populates="author", lazy="selectin",
        cascade="all, delete-orphan"
    )

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    title: Mapped[str] = mapped_column(String(200), nullable=False)
    content: Mapped[str] = mapped_column(Text, nullable=False)
    published: Mapped[bool] = mapped_column(Boolean, default=False)
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())

    # 多对一:每篇文章属于一个作者
    author: Mapped["User"] = relationship("User", back_populates="posts")
    # 多对多:一篇文章可以有多个标签
    tags: Mapped[List["Tag"]] = relationship("Tag", secondary=post_tags, lazy="selectin")

class Tag(Base):
    __tablename__ = "tags"

    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String(30), unique=True, nullable=False)

📘 lazy="selectin" 会在父对象被查询时,自动使用 IN 语句一次性加载所有关联数据,有效避免 N+1 问题。但也要根据场景权衡,如果有些场合不需要关联数据,可以改为 lazy="select"(按需加载)。


4. Repository 模式:让数据访问更干净

将 SQL 语句封装在 Repository 中,路由函数只关心业务逻辑,测试时也可以轻松 mock 掉数据库操作。

# repositories/base.py
from typing import TypeVar, Generic, Optional, List, Dict
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from database import Base

T = TypeVar('T', bound=Base)

class BaseRepository(Generic[T]):
    """通用的 CRUD 仓库,类型绑定到具体的模型"""
    def __init__(self, model: T, db: AsyncSession):
        self.model = model
        self.db = db

    async def get_by_id(self, obj_id: int) -> Optional[T]:
        result = await self.db.execute(
            select(self.model).where(self.model.id == obj_id)
        )
        return result.scalar_one_or_none()

    async def get_multi(self, skip: int = 0, limit: int = 20) -> List[T]:
        result = await self.db.execute(
            select(self.model).offset(skip).limit(limit)
        )
        return list(result.scalars().all())

    async def create(self, obj_in: Dict) -> T:
        obj = self.model(**obj_in)
        self.db.add(obj)
        await self.db.commit()
        await self.db.refresh(obj)   # 刷新以获取数据库生成的默认值(如 id、时间)
        return obj

    async def delete(self, obj: T) -> None:
        await self.db.delete(obj)
        await self.db.commit()

5. 编写 API 路由:从 Schema 到端点

先用 Pydantic 定义请求和响应模型,再在路由中调用 Repository。

5.1 Pydantic 模式

# schemas.py
from pydantic import BaseModel, EmailStr
from datetime import datetime

class UserBase(BaseModel):
    name: str
    email: EmailStr

class UserCreate(UserBase):
    password: str

class UserPublic(UserBase):
    id: int
    is_active: bool
    created_at: datetime

    class Config:
        from_attributes = True   # 允许从 ORM 对象直接转换

5.2 用户路由(CRUD 示例)

# routers/users.py
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from passlib.context import CryptContext
from database import get_db
from repositories.base import BaseRepository
from models import User
from schemas import UserCreate, UserPublic

router = APIRouter(prefix="/users", tags=["用户管理"])
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def get_user_repo(db: AsyncSession = Depends(get_db)) -> BaseRepository[User]:
    return BaseRepository(User, db)

@router.get("/", response_model=list[UserPublic])
async def list_users(
    skip: int = Query(0, ge=0),
    limit: int = Query(20, ge=1, le=100),
    repo: BaseRepository[User] = Depends(get_user_repo)
):
    return await repo.get_multi(skip, limit)

@router.post("/", response_model=UserPublic, status_code=status.HTTP_201_CREATED)
async def create_user(
    data: UserCreate,
    repo: BaseRepository[User] = Depends(get_user_repo)
):
    # 实际项目中应在这里检查邮箱是否已存在
    hashed = pwd_context.hash(data.password)
    user_data = data.model_dump(exclude={"password"})
    user_data["hashed_password"] = hashed
    return await repo.create(user_data)

访问 /users?skip=0&limit=10 就能获取分页用户列表;向 /users 发送 POST 即可创建用户。


6. 避坑指南:N+1 查询与事务

6.1 优雅地预加载关联数据

在异步环境中,延迟加载(lazy load)会自动发出额外的查询,很容易出现 N+1 问题。示例:

错误做法(会触发 N+1 查询):

posts = await repo.get_multi()
for post in posts:
    print(post.author.name)   # 每循环一次都会查一次 author

正确做法:查询时使用 selectinload 预加载关联对象。

from sqlalchemy.orm import selectinload

result = await db.execute(
    select(Post).options(selectinload(Post.author))
)
posts = list(result.scalars().all())
for post in posts:
    print(post.author.name)   # 所有 author 已在一条 SQL 中加载完毕

6.2 正确处理异步事务

在异步 SQLAlchemy 中,推荐用 async with db.begin() 来管理事务边界,它会自动提交或回滚。

async def transfer_money(db: AsyncSession, from_id: int, to_id: int, amount: float):
    async with db.begin():
        from_user = await db.get(User, from_id)
        to_user = await db.get(User, to_id)
        from_user.balance -= amount
        to_user.balance += amount
    # 离开 with 块时如果没有异常,自动提交;有异常则自动回滚

🧰 为什么不用 commit() / rollback()
手动调用容易出现遗漏,尤其在多层嵌套逻辑中。db.begin() 的上下文管理器会确保无论成功或失败,事务都能被正确处理。


7. 核心组件速查

组件作用
create_async_engine创建异步数据库引擎,配置连接池参数
AsyncSessionLocal异步会话工厂,为每个请求生成独立会话
Mapped[T]SQLAlchemy 2.0 类型注解,提升代码可读性与 IDE 支持
selectinload预加载关联,避免 N+1 查询
Repository 模式隔离数据访问逻辑,便于测试与维护

这套方案可以直接作为中型项目的骨架,在此基础上你可以接入 Alembic 管理数据库迁移、用 Redis 做缓存、加入分布式锁处理并发等。后续章节会继续深入这些实战技巧,欢迎持续关注。


🎉 恭喜你完成了 FastAPI + SQLAlchemy 2.0 异步 ORM 的完整搭建!动手试试吧,遇到问题欢迎在评论区交流。