使用SQLAlchemy
Python ORM 实战:用 SQLAlchemy 优雅地操作数据库
在应用开发中,几乎每个项目都绕不开数据库。直接用 SQL 语句操作数据库虽然直接,但当项目规模变大、表结构变复杂时,维护这些分散的 SQL 代码会变得越来越困难。ORM(对象关系映射)的出现就是为了解决这个问题,它让我们能用熟悉的面向对象方式操作数据库,而 SQLAlchemy 正是 Python 生态中最成熟、最强大的 ORM 框架之一。本文将带你从安装配置到advanced-features,一步步掌握 SQLAlchemy 的核心用法。
1. ORM 是什么?为什么需要它?
ORM 的核心思想是:数据库表 ↔ Python 类,表中的一行记录 ↔ 类的一个实例,表中的列 ↔ 类的属性。
传统方式 vs ORM 方式
在不使用 ORM 时,我们从数据库查询到的结果通常是元组列表:
# 传统方式:返回原始元组
[
('1', 'Michael'),
('2', 'Bob'),
('3', 'Adam')
]
这种方式需要我们手动处理字段的顺序和类型,在涉及多表关联查询时尤其容易出错。而通过 ORM,我们可以用对象的形式处理数据:
# ORM 方式:返回对象列表
[
User(id=1, name='Michael'),
User(id=2, name='Bob'),
User(id=3, name='Adam')
]
这样一来,我们访问数据时就可以使用 user.name 而不是 user[1],代码的可读性和可维护性都得到了极大提升。
2. 安装与基本配置
2.1 安装 SQLAlchemy
SQLAlchemy 的核心包可以直接通过 pip 安装:
若使用 MySQL 数据库,还需要安装相应的数据库驱动,例如 pymysql 或 mysql-connector-python:
pip install pymysql
# 或者
pip install mysql-connector-python
2.2 创建数据库连接与会话
下面是一个标准的基础配置,包含了连接引擎、会话工厂以及基类的定义:
from sqlalchemy import create_engine, Column, String, Integer, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship, declarative_base
# 声明基类,所有的模型都会继承自它
Base = declarative_base()
# 数据库连接地址,格式为:数据库类型+驱动://用户名:密码@主机:端口/数据库名
DATABASE_URL = "mysql+pymysql://username:password@localhost:3306/your_database"
engine = create_engine(DATABASE_URL, echo=True) # echo=True 可打印 SQL 语句,调试时很有用
# 创建会话类,用于后续的数据库操作
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
这里 SessionLocal 是我们后续进行 CRUD 操作的工厂,每个请求或操作都应创建一个独立的会话。
3. 定义数据库模型(表结构)
在 SQLAlchemy 中,一个模型类就对应数据库中的一张表。下面先来看一个简单的用户模型:
3.1 单表模型
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True, index=True, nullable=False)
模型定义中的一些常用参数:
primary_key=True:主键。
index=True:为此列创建索引,提升查询效率。
unique=True:确保该列值唯一。
nullable=False:相当于数据库的 NOT NULL 约束。
3.2 定义一对多与多对一关系
现实中的表往往存在关联关系,比如 一个用户可以发表多篇文章(一对多)。在 SQLAlchemy 中我们可以通过 ForeignKey 和 relationship 来建立这种关系:
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50))
email = Column(String(100), unique=True, index=True)
# 定义与 Post 的一对多关系,back_populates 用于双向绑定
posts = relationship("Post", back_populates="author")
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True, index=True)
title = Column(String(100), nullable=False)
content = Column(String(2000))
author_id = Column(Integer, ForeignKey('users.id'))
# 定义多对一关系
author = relationship("User", back_populates="posts")
ForeignKey('users.id') 指明了外键约束,对应到 users 表的 id 字段。
relationship 则是 ORM 层面的关系声明,让我们可以直接通过 user.posts 获取该用户的所有文章,或通过 post.author 获取文章的作者。
4. 数据库基本操作(CRUD)
在定义好模型后,我们通常会在应用启动时执行一次建表操作:
# 创建所有继承自 Base 的表(如果表已存在会跳过)
Base.metadata.create_all(bind=engine)
接下来,让我们看看如何进行增删改查操作。为了保持代码清晰,每个操作都封装成了函数,并通过 try...finally 确保会话被正确关闭。
4.1 创建记录
def create_user(name: str, email: str):
session = SessionLocal()
try:
user = User(name=name, email=email)
session.add(user) # 添加到会话
session.commit() # 提交事务
session.refresh(user) # 刷新实例,获取数据库生成的 id 等字段
return user
finally:
session.close()
4.2 查询记录
SQLAlchemy 提供了非常灵活的查询构建方式:
# 获取所有用户
def get_all_users():
session = SessionLocal()
try:
return session.query(User).all()
finally:
session.close()
# 按主键查询单个用户
def get_user_by_id(user_id: int):
session = SessionLocal()
try:
return session.query(User).filter(User.id == user_id).first()
finally:
session.close()
# 条件查询(例如按邮箱查找)
def get_user_by_email(email: str):
session = SessionLocal()
try:
return session.query(User).filter(User.email == email).first()
finally:
session.close()
常用的查询方法:
all():返回所有结果的列表。
first():返回第一个结果,没有则返回 None。
filter():添加过滤条件,可链式调用多个。
filter_by():更简单,使用关键字参数进行过滤(如 filter_by(email=email))。
4.3 更新记录
更新一般需要先查找到要修改的记录,然后修改属性,再提交事务:
def update_user(user_id: int, name: str, email: str):
session = SessionLocal()
try:
user = session.query(User).filter(User.id == user_id).first()
if user:
user.name = name
user.email = email
session.commit()
session.refresh(user)
return user
finally:
session.close()
4.4 删除记录
删除操作类似,先查询再调用 delete() 方法:
def delete_user(user_id: int):
session = SessionLocal()
try:
user = session.query(User).filter(User.id == user_id).first()
if user:
session.delete(user)
session.commit()
return user
finally:
session.close()
5. 操作关联对象
ORM 的一大优势就是能自然地处理对象之间的关联关系。假设我们要创建用户的同时为其添加几篇文章,可以这样做:
def create_user_with_posts(name: str, email: str, post_data: list[dict]):
session = SessionLocal()
try:
user = User(name=name, email=email)
for data in post_data:
post = Post(**data)
user.posts.append(post) # 利用 relationship 自动处理外键
session.add(user)
session.commit()
session.refresh(user)
return user
finally:
session.close()
查询用户时,如果我们希望其所有文章也一并加载(避免 N+1 查询问题),可以使用 joinedload 进行预加载:
from sqlalchemy.orm import joinedload
def get_user_with_posts(user_id: int):
session = SessionLocal()
try:
user = session.query(User).filter(User.id == user_id).options(
joinedload(User.posts)
).first()
return user
finally:
session.close()
6. advanced-features
6.1 事务管理
对于涉及多条记录的更新操作,事务可以保证数据一致性。SQLAlchemy 默认开启了事务,我们需要手动提交或回滚:
def transfer_amount(from_user_id: int, to_user_id: int, amount: float):
session = SessionLocal()
try:
from_user = session.query(User).filter(User.id == from_user_id).first()
to_user = session.query(User).filter(User.id == to_user_id).first()
if not from_user or not to_user:
raise ValueError("用户不存在")
# 假设 User 有一个 balance 字段
if from_user.balance < amount:
raise ValueError("余额不足")
from_user.balance -= amount
to_user.balance += amount
session.commit() # 全部成功,提交事务
except Exception as e:
session.rollback() # 出现异常,回滚所有更改
raise e
finally:
session.close()
6.2 异步支持(适用于 SQLAlchemy 2.0+)
在高并发场景下,异步操作能显著提升性能。SQLAlchemy 2.0 提供了完整的异步支持:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import select
DATABASE_URL_ASYNC = "mysql+asyncmy://user:pass@localhost/your_database"
async_engine = create_async_engine(DATABASE_URL_ASYNC, echo=True)
AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False)
async def async_get_user(user_id: int):
async with AsyncSessionLocal() as session:
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalars().first()
return user
使用异步时,注意搭配 asyncmy 或 aiomysql 等异步驱动。
7. 一些实用建议
- 管理会话生命周期:推荐使用上下文管理器封装会话,或者利用依赖注入(如 FastAPI 的
Depends)自动处理会话的创建与关闭。
- 避免长事务:保持事务尽可能短小,减少锁定资源的时间,尤其在高并发写入场景。
- 批量操作:当需要插入或更新大量数据时,使用
session.bulk_insert_mappings() 或 session.bulk_save_objects() 能获得更好的性能。
- 合理选择加载策略:对于关联属性,根据业务场景选择
joinedload(预加载)、selectinload(二次查询预加载)或保持默认的惰性加载。
- 善用类型提示:配合现代 IDE,类型提示能让你在编写查询时获得更好的自动补全和错误检查。
8. 总结
SQLAlchemy 作为 Python 中最成熟的 ORM 框架,既保留了原始 SQL 的表达能力,又提供了面向对象的操作接口。从简单的单表 CRUD 到复杂的关系处理、事务控制和异步操作,它都能很好应对。通过本文的介绍,相信你已经掌握了 SQLAlchemy 的核心用法。接下来,不妨在你的项目中尝试用 SQLAlchemy 替换原始的 SQL 操作,体验它带来的开发效率提升。
如果你希望进一步深入学习,推荐阅读 SQLAlchemy 官方文档,其中包含了大量高级用法和最佳实践。