#SQLAlchemy ORM (Flask-SQLAlchemy):告别原始 SQL,用 Python 类定义数据库模型
#1. ORM 概念
#1.1 ORM 是什么?
原始 SQL: ORM(对象关系映射):
INSERT INTO users user = User(name="Alice", email="a@b.com")
VALUES ("Alice", "a@b.com") db.session.add(user)
db.session.commit()
↓
底层自动生成正确的 SQL#1.2 安装
pip install flask-sqlalchemy#2. 配置
#2.1 基础配置
# app/__init__.py
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
def create_app():
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///daoman.db"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db.init_app(app)
return app#2.2 支持的数据库
# SQLite(开发,默认)
SQLALCHEMY_DATABASE_URI = "sqlite:///app.db"
# PostgreSQL(生产推荐)
SQLALCHEMY_DATABASE_URI = "postgresql://user:password@localhost:5432/mydb"
# MySQL
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://user:password@localhost/mydb"
# 带连接池
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {
"pool_size": 10,
"pool_recycle": 3600,
}#3. 定义模型
#3.1 用户模型
# app/models/user.py
from datetime import datetime
from app.extensions import db
from flask_login import UserMixin
class User(UserMixin, db.Model):
"""用户模型"""
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(50), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(256), nullable=False)
bio = db.Column(db.String(500), default="")
avatar = db.Column(db.String(200), default="")
is_active = db.Column(db.Boolean, default=True)
is_admin = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关系
posts = db.relationship("Post", back_populates="author", lazy="dynamic")
def __repr__(self):
return f"<User {self.username}>"#3.2 文章模型
# app/models/post.py
from datetime import datetime
from app.extensions import db
class Post(db.Model):
__tablename__ = "posts"
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
slug = db.Column(db.String(200), unique=True, index=True) # URL 别名
content = db.Column(db.Text, nullable=False)
summary = db.Column(db.String(500))
cover_image = db.Column(db.String(200))
views = db.Column(db.Integer, default=0)
is_published = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 外键
author_id = db.Column(db.Integer, db.ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
category_id = db.Column(db.Integer, db.ForeignKey("categories.id"))
# 关系
author = db.relationship("User", back_populates="posts")
category = db.relationship("Category", back_populates="posts")
comments = db.relationship("Comment", back_populates="post", lazy="dynamic", cascade="all, delete-orphan")
def __repr__(self):
return f"<Post {self.title}>"#3.3 字段类型速查
| SQLAlchemy 类型 | Python 类型 | 说明 |
|---|---|---|
db.Integer | int | 整数 |
db.BigInteger | int | 大整数 |
db.String(255) | str | 可变长度字符串 |
db.Text | str | 长文本 |
db.Boolean | bool | 布尔 |
db.Date | date | 日期 |
db.DateTime | datetime | 日期时间 |
db.Time | time | 时间 |
db.Float | float | 浮点数 |
db.Numeric(10,2) | Decimal | 精确小数 |
db.LargeBinary | bytes | 二进制数据 |
db.JSON | dict/list | JSON 数据 |
#4. CRUD 操作
#4.1 创建(Create)
from app.extensions import db
from app.models.user import User
from app.models.post import Post
# 创建单条
user = User(username="alice", email="alice@example.com", password_hash="...")
db.session.add(user)
db.session.commit()
# 批量创建
users = [
User(username="alice", email="alice@ex.com", password_hash="..."),
User(username="bob", email="bob@ex.com", password_hash="..."),
]
db.session.add_all(users)
db.session.commit()
# 快捷方式
db.session.bulk_save_objects(users)
db.session.commit()#4.2 读取(Read)
# 按主键查
user = db.session.get(User, 1)
# 按字段查
user = User.query.filter_by(username="alice").first()
user = User.query.filter(User.username == "alice").first()
# 高级查询
users = User.query.filter(
User.is_active == True,
User.created_at > datetime(2025, 1, 1)
).order_by(User.created_at.desc()).limit(10).all()
# 原生 SQL
from sqlalchemy import text
result = db.session.execute(text("SELECT * FROM users WHERE id > :id"), {"id": 5})#4.3 更新(Update)
user = User.query.get(1)
user.bio = "这是我的新简介"
user.is_active = False
db.session.commit()
# 批量更新
User.query.filter_by(is_active=False).update({"is_admin": False})
db.session.commit()#4.4 删除(Delete)
user = User.query.get(1)
db.session.delete(user)
db.session.commit()
# 批量删除
Post.query.filter_by(is_published=False).delete()
db.session.commit()#5. 查询过滤器
# 相等/不等
User.query.filter(User.username == "alice")
User.query.filter(User.username != "alice")
# 模糊查询
User.query.filter(User.username.like("%li%"))
User.query.filter(User.username.ilike("%LI%")) # 不区分大小写
# IN
User.query.filter(User.id.in_([1, 2, 3]))
# AND / OR
from sqlalchemy import and_, or_
User.query.filter(
and_(User.is_active == True, User.is_admin == False)
)
User.query.filter(
or_(User.username == "alice", User.email == "alice@example.com")
)
# 空值
User.query.filter(User.bio == None) # 查 NULL
User.query.filter(User.bio != None) # 查非 NULL
# 数量和
from sqlalchemy import func
db.session.query(func.count(User.id)).scalar()
User.query.count()#6. 分页
# 每页 10 条,获取第 2 页
page = request.args.get("page", 1, type=int)
per_page = 10
pagination = User.query.order_by(User.created_at.desc()).paginate(
page=page,
per_page=per_page,
error_out=False
)
users = pagination.items # 当前页数据
total = pagination.total # 总数
pages = pagination.pages # 总页数
has_next = pagination.has_next # 是否有下一页
has_prev = pagination.has_prev # 是否有上一页
next_num = pagination.next_num # 下一页页码
prev_num = pagination.prev_num # 上一页页码模板中使用分页:
{% for user in pagination.items %}
<li>{{ user.username }}</li>
{% endfor %}
{% if pagination.pages > 1 %}
<div class="pagination">
{% if pagination.has_prev %}
<a href="{{ url_for('users', page=pagination.prev_num) }}">上一页</a>
{% endif %}
<span>第 {{ pagination.page }} / {{ pagination.pages }} 页</span>
{% if pagination.has_next %}
<a href="{{ url_for('users', page=pagination.next_num) }}">下一页</a>
{% endif %}
</div>
{% endif %}#7. 小结
# SQLAlchemy CRUD 速查
db.session.add(obj) # 添加
db.session.commit() # 提交
db.session.rollback() # 回滚
User.query.get(id) # 按主键查
User.query.filter_by(x=y) # 按字段查
User.query.filter(Xxx) # 高级过滤
User.query.order_by(x.desc()) # 排序
User.query.limit(n) # 限制数量
User.query.offset(n) # 跳过
User.query.paginate(...) # 分页
User.query.count() # 计数
User.query.all() # 全部
User.query.first() # 第一个
db.session.delete(obj) # 删除💡 最佳实践:在 Flask 应用上下文(
with app.app_context():)中执行数据库操作,或者在 Flask 请求中使用(Flask 自动管理上下文)。
🔗 扩展阅读

