使用SQLAlchemy

Python ORM 实战:用 SQLAlchemy 优雅地操作数据库

在应用开发中,几乎每个项目都绕不开数据库。直接用 SQL 语句操作数据库虽然直接,但当项目规模变大、表结构变复杂时,维护这些分散的 SQL 代码会变得越来越困难。ORM(对象关系映射)的出现就是为了解决这个问题,它让我们能用熟悉的面向对象方式操作数据库,而 SQLAlchemy 正是 Python 生态中最成熟、最强大的 ORM 框架之一。本文将带你从安装配置到advanced-features,一步步掌握 SQLAlchemy 的核心用法。

1. ORM 是什么?为什么需要它?

ORM 的核心思想是:数据库表 ↔ Python 类,表中的一行记录 ↔ 类的一个实例,表中的列 ↔ 类的属性

传统方式 vs ORM 方式

在不使用 ORM 时,我们从数据库查询到的结果通常是元组列表:

# 传统方式:返回原始元组
[
    ('1', 'Michael'),
    ('2', 'Bob'),
    ('3', 'Adam')
]

这种方式需要我们手动处理字段的顺序和类型,在涉及多表关联查询时尤其容易出错。而通过 ORM,我们可以用对象的形式处理数据:

# ORM 方式:返回对象列表
[
    User(id=1, name='Michael'),
    User(id=2, name='Bob'),
    User(id=3, name='Adam')
]

这样一来,我们访问数据时就可以使用 user.name 而不是 user[1],代码的可读性和可维护性都得到了极大提升。

2. 安装与基本配置

2.1 安装 SQLAlchemy

SQLAlchemy 的核心包可以直接通过 pip 安装:

pip install sqlalchemy

若使用 MySQL 数据库,还需要安装相应的数据库驱动,例如 pymysqlmysql-connector-python

pip install pymysql
# 或者
pip install mysql-connector-python

2.2 创建数据库连接与会话

下面是一个标准的基础配置,包含了连接引擎、会话工厂以及基类的定义:

from sqlalchemy import create_engine, Column, String, Integer, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship, declarative_base

# 声明基类,所有的模型都会继承自它
Base = declarative_base()

# 数据库连接地址,格式为:数据库类型+驱动://用户名:密码@主机:端口/数据库名
DATABASE_URL = "mysql+pymysql://username:password@localhost:3306/your_database"
engine = create_engine(DATABASE_URL, echo=True)  # echo=True 可打印 SQL 语句,调试时很有用

# 创建会话类,用于后续的数据库操作
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

这里 SessionLocal 是我们后续进行 CRUD 操作的工厂,每个请求或操作都应创建一个独立的会话。

3. 定义数据库模型(表结构)

在 SQLAlchemy 中,一个模型类就对应数据库中的一张表。下面先来看一个简单的用户模型:

3.1 单表模型

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), nullable=False)
    email = Column(String(100), unique=True, index=True, nullable=False)

模型定义中的一些常用参数:

  • primary_key=True:主键。
  • index=True:为此列创建索引,提升查询效率。
  • unique=True:确保该列值唯一。
  • nullable=False:相当于数据库的 NOT NULL 约束。

3.2 定义一对多与多对一关系

现实中的表往往存在关联关系,比如 一个用户可以发表多篇文章(一对多)。在 SQLAlchemy 中我们可以通过 ForeignKeyrelationship 来建立这种关系:

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50))
    email = Column(String(100), unique=True, index=True)

    # 定义与 Post 的一对多关系,back_populates 用于双向绑定
    posts = relationship("Post", back_populates="author")

class Post(Base):
    __tablename__ = 'posts'

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(100), nullable=False)
    content = Column(String(2000))
    author_id = Column(Integer, ForeignKey('users.id'))

    # 定义多对一关系
    author = relationship("User", back_populates="posts")
  • ForeignKey('users.id') 指明了外键约束,对应到 users 表的 id 字段。
  • relationship 则是 ORM 层面的关系声明,让我们可以直接通过 user.posts 获取该用户的所有文章,或通过 post.author 获取文章的作者。

4. 数据库基本操作(CRUD)

在定义好模型后,我们通常会在应用启动时执行一次建表操作:

# 创建所有继承自 Base 的表(如果表已存在会跳过)
Base.metadata.create_all(bind=engine)

接下来,让我们看看如何进行增删改查操作。为了保持代码清晰,每个操作都封装成了函数,并通过 try...finally 确保会话被正确关闭。

4.1 创建记录

def create_user(name: str, email: str):
    session = SessionLocal()
    try:
        user = User(name=name, email=email)
        session.add(user)       # 添加到会话
        session.commit()        # 提交事务
        session.refresh(user)   # 刷新实例,获取数据库生成的 id 等字段
        return user
    finally:
        session.close()

4.2 查询记录

SQLAlchemy 提供了非常灵活的查询构建方式:

# 获取所有用户
def get_all_users():
    session = SessionLocal()
    try:
        return session.query(User).all()
    finally:
        session.close()

# 按主键查询单个用户
def get_user_by_id(user_id: int):
    session = SessionLocal()
    try:
        return session.query(User).filter(User.id == user_id).first()
    finally:
        session.close()

# 条件查询(例如按邮箱查找)
def get_user_by_email(email: str):
    session = SessionLocal()
    try:
        return session.query(User).filter(User.email == email).first()
    finally:
        session.close()

常用的查询方法:

  • all():返回所有结果的列表。
  • first():返回第一个结果,没有则返回 None
  • filter():添加过滤条件,可链式调用多个。
  • filter_by():更简单,使用关键字参数进行过滤(如 filter_by(email=email))。

4.3 更新记录

更新一般需要先查找到要修改的记录,然后修改属性,再提交事务:

def update_user(user_id: int, name: str, email: str):
    session = SessionLocal()
    try:
        user = session.query(User).filter(User.id == user_id).first()
        if user:
            user.name = name
            user.email = email
            session.commit()
            session.refresh(user)
        return user
    finally:
        session.close()

4.4 删除记录

删除操作类似,先查询再调用 delete() 方法:

def delete_user(user_id: int):
    session = SessionLocal()
    try:
        user = session.query(User).filter(User.id == user_id).first()
        if user:
            session.delete(user)
            session.commit()
        return user
    finally:
        session.close()

5. 操作关联对象

ORM 的一大优势就是能自然地处理对象之间的关联关系。假设我们要创建用户的同时为其添加几篇文章,可以这样做:

def create_user_with_posts(name: str, email: str, post_data: list[dict]):
    session = SessionLocal()
    try:
        user = User(name=name, email=email)
        for data in post_data:
            post = Post(**data)
            user.posts.append(post)   # 利用 relationship 自动处理外键
        session.add(user)
        session.commit()
        session.refresh(user)
        return user
    finally:
        session.close()

查询用户时,如果我们希望其所有文章也一并加载(避免 N+1 查询问题),可以使用 joinedload 进行预加载:

from sqlalchemy.orm import joinedload

def get_user_with_posts(user_id: int):
    session = SessionLocal()
    try:
        user = session.query(User).filter(User.id == user_id).options(
            joinedload(User.posts)
        ).first()
        return user
    finally:
        session.close()

6. advanced-features

6.1 事务管理

对于涉及多条记录的更新操作,事务可以保证数据一致性。SQLAlchemy 默认开启了事务,我们需要手动提交或回滚:

def transfer_amount(from_user_id: int, to_user_id: int, amount: float):
    session = SessionLocal()
    try:
        from_user = session.query(User).filter(User.id == from_user_id).first()
        to_user = session.query(User).filter(User.id == to_user_id).first()

        if not from_user or not to_user:
            raise ValueError("用户不存在")

        # 假设 User 有一个 balance 字段
        if from_user.balance < amount:
            raise ValueError("余额不足")

        from_user.balance -= amount
        to_user.balance += amount

        session.commit()   # 全部成功,提交事务
    except Exception as e:
        session.rollback() # 出现异常,回滚所有更改
        raise e
    finally:
        session.close()

6.2 异步支持(适用于 SQLAlchemy 2.0+)

在高并发场景下,异步操作能显著提升性能。SQLAlchemy 2.0 提供了完整的异步支持:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import select

DATABASE_URL_ASYNC = "mysql+asyncmy://user:pass@localhost/your_database"
async_engine = create_async_engine(DATABASE_URL_ASYNC, echo=True)
AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False)

async def async_get_user(user_id: int):
    async with AsyncSessionLocal() as session:
        result = await session.execute(select(User).where(User.id == user_id))
        user = result.scalars().first()
        return user

使用异步时,注意搭配 asyncmyaiomysql 等异步驱动。

7. 一些实用建议

  • 管理会话生命周期:推荐使用上下文管理器封装会话,或者利用依赖注入(如 FastAPI 的 Depends)自动处理会话的创建与关闭。
  • 避免长事务:保持事务尽可能短小,减少锁定资源的时间,尤其在高并发写入场景。
  • 批量操作:当需要插入或更新大量数据时,使用 session.bulk_insert_mappings()session.bulk_save_objects() 能获得更好的性能。
  • 合理选择加载策略:对于关联属性,根据业务场景选择 joinedload(预加载)、selectinload(二次查询预加载)或保持默认的惰性加载。
  • 善用类型提示:配合现代 IDE,类型提示能让你在编写查询时获得更好的自动补全和错误检查。

8. 总结

SQLAlchemy 作为 Python 中最成熟的 ORM 框架,既保留了原始 SQL 的表达能力,又提供了面向对象的操作接口。从简单的单表 CRUD 到复杂的关系处理、事务控制和异步操作,它都能很好应对。通过本文的介绍,相信你已经掌握了 SQLAlchemy 的核心用法。接下来,不妨在你的项目中尝试用 SQLAlchemy 替换原始的 SQL 操作,体验它带来的开发效率提升。

如果你希望进一步深入学习,推荐阅读 SQLAlchemy 官方文档,其中包含了大量高级用法和最佳实践。