#FastAPI与Alembic数据库迁移完全指南
📂 所属阶段:第三阶段 — 数据持久化(数据库篇)
🔗 相关章节:FastAPI SQLAlchemy 2.0实战 · FastAPI依赖注入系统
#目录
- 为什么需要数据库迁移工具
- Alembic基础概念
- 项目集成与配置
- 自动生成迁移脚本
- 手动编写迁移脚本
- 版本控制与管理
- 多环境配置
- 生产环境部署
- 高级迁移技巧
- 常见陷阱与解决方案
- 最佳实践
- 与其他迁移工具对比
- 总结
#为什么需要数据库迁移工具?
#传统数据库管理的问题
在没有数据库迁移工具的情况下,我们通常面临以下挑战:
| 场景 | 传统做法 | 问题 | 影响 |
|---|---|---|---|
| 添加字段 | ALTER TABLE users ADD COLUMN phone VARCHAR(20); | 手动执行,难以回滚 | 生产环境风险高 |
| 修改表结构 | 直接修改数据库 | 没有版本控制 | 团队协作困难 |
| 回滚变更 | 手写回滚SQL | 容易出错 | 数据安全风险 |
| 多环境同步 | 手动复制SQL | 容易遗漏 | 环境不一致 |
| 团队协作 | 每人维护自己的SQL | 版本混乱 | 项目维护困难 |
#Alembic的核心价值
Alembic作为SQLAlchemy的官方迁移工具,提供了:
- 版本控制:数据库结构的Git-like管理
- 自动化:模型变更自动生成迁移脚本
- 安全性:可逆操作,支持回滚
- 一致性:确保多环境数据库结构同步
- 审计追踪:完整的变更历史记录
#项目依赖安装
# 安装Alembic
pip install alembic
# 如果使用PostgreSQL
pip install alembic psycopg2-binary
# 如果使用MySQL
pip install alembic PyMySQL
# 如果使用SQLite(内置支持)
pip install alembic#完整依赖示例
# requirements.txt
fastapi==0.104.1
sqlalchemy==2.0.23
alembic==1.13.1
asyncpg==0.29.0 # PostgreSQL异步驱动
psycopg2-binary==2.9.9 # PostgreSQL同步驱动
pydantic==2.5.0
python-multipart==0.0.6
uvicorn==0.24.0#Alembic基础概念
#Alembic核心组件
Alembic的主要组成部分包括:
- 环境配置 (env.py):连接数据库和模型的桥梁
- 配置文件 (alembic.ini):全局配置参数
- 迁移脚本 (versions/):版本化的数据库变更
- 模板文件 (script.py.mako):生成迁移脚本的模板
#项目集成与配置
#FastAPI项目结构
my_fastapi_project/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── user.py
│ ├── schemas/
│ └── database.py
├── alembic/
│ ├── env.py
│ ├── script.py.mako
│ └── versions/
├── alembic.ini
├── config.py
└── requirements.txt#修改env.py以适应FastAPI项目
# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# 导入FastAPI项目中的Base
from app.models.base import Base
from app.database import DATABASE_URL
# 从FastAPI配置中获取数据库URL
config = context.config
# 设置数据库URL
config.set_main_option("sqlalchemy.url", DATABASE_URL)
# 配置Logging
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# 目标元数据 - 指向我们的模型
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""在离线模式下运行迁移(不连接数据库)"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""在在线模式下运行迁移(连接数据库)"""
connectable = context.config.attributes.get('connection', None)
if connectable is None:
connectable = engine_from_config(
context.config.get_section(context.config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True, # 比较列类型变更
compare_server_default=True, # 比较默认值变更
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()#自定义配置文件 (alembic.ini)
# alembic.ini
[alembic]
# 设置源文件夹(包含models的文件夹)
script_location = alembic
# 模块路径,用于导入模型
prepend_sys_path = .
# 迁移文件名模板
file_template = %%(rev)s_%%(slug)s
# 表前缀(可选)
# table_prefix = myapp_
# timezone to use when rendering the date within the migration file
# as well as the filename. string value is passed to dateutil.tz.gettz()
# if left blank, dates will be recorded in UTC
# timezone =
# max length of characters to apply to the "slug" field
# max_length = 40
# set to 'true' to run the environment during the 'revision' command,
# regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without a source .py file
# sourceless = false
# version_num separator; default is ``_`` (underscore), ``.`` is supported
# version_separator = .
# version path separator; default is ``os.sep`` (path separator)
# version_path_separator = /
# set to 'true' to search and use version locations from the environment
# version_locations = %(here)s/bar:%(here)s/bat
# version_path_separator = :
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stdout,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S#2. 配置 env.py
#2.1 基础配置
# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
# 导入你的模型(重要!Alembic 据此生成迁移脚本)
from models import Base # SQLAlchemy Base
from config import get_settings
config = context.config
settings = get_settings()
# 设置数据库 URL
config.set_main_option("sqlalchemy.url", settings.database_url)
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""离线模式:生成 SQL 脚本,不连接数据库"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""在线模式:直接连接数据库执行迁移"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()#自动生成迁移脚本
#基础自动生成命令
# 对比模型与当前数据库,自动生成迁移脚本
alembic revision --autogenerate -m "add user avatar column"
# 参数说明:
# --autogenerate:自动对比模型差异
# -m:添加迁移说明#自动迁移的工作原理
- 模型扫描:Alembic扫描项目中的SQLAlchemy模型
- 数据库对比:连接当前数据库,获取现有表结构
- 差异分析:比较模型与数据库结构差异
- 脚本生成:根据差异生成upgrade/downgrade函数
#生成的迁移文件示例
# alembic/versions/20260326_add_user_avatar.py
"""add user avatar column
Revision ID: abc123
Revises: def456
Create Date: 2026-03-26 14:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers
revision = 'abc123'
down_revision = 'def456'
branch_labels = None
depends_on = None
def upgrade() -> None:
# 自动检测到模型中新增的avatar字段
op.add_column('users', sa.Column('avatar', sa.String(length=500), nullable=True))
def downgrade() -> None:
# 回滚操作:删除avatar字段
op.drop_column('users', 'avatar')#常见自动生成场景
# 1. 添加新表
def upgrade() -> None:
op.create_table('posts',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('title', sa.String(length=200), nullable=False),
sa.Column('content', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
def downgrade() -> None:
op.drop_table('posts')
# 2. 修改列类型
def upgrade() -> None:
op.alter_column('users', 'email',
existing_type=sa.VARCHAR(length=50),
type_=sa.String(length=100),
existing_nullable=False)
def downgrade() -> None:
op.alter_column('users', 'email',
existing_type=sa.String(length=100),
type_=sa.VARCHAR(length=50),
existing_nullable=False)
# 3. 添加外键约束
def upgrade() -> None:
op.create_foreign_key('fk_posts_user_id', 'posts', 'users', ['user_id'], ['id'])
def downgrade() -> None:
op.drop_constraint('fk_posts_user_id', 'posts', type_='foreignkey')#手动编写迁移脚本
#何时需要手动编写
- 数据迁移:需要在结构变更的同时迁移现有数据
- 复杂操作:自动生成无法处理的复杂SQL
- 条件判断:根据特定条件执行不同操作
- 性能优化:需要特定的执行顺序或优化
#数据迁移示例
# 迁移用户状态字段:从字符串改为整数枚举
def upgrade() -> None:
# 1. 添加新列
op.add_column('users', sa.Column('status_new', sa.Integer(), nullable=True))
# 2. 迁移数据
conn = op.get_bind()
conn.execute(sa.text("""
UPDATE users
SET status_new = CASE
WHEN status = 'active' THEN 1
WHEN status = 'inactive' THEN 0
ELSE 0
END
"""))
# 3. 删除旧列,重命名新列
op.drop_column('users', 'status')
op.alter_column('users', 'status_new', new_column_name='status', nullable=False)
# 4. 添加约束
op.create_check_constraint('status_check', 'users', 'status IN (0, 1)')
def downgrade() -> None:
# 回滚操作
op.add_column('users', sa.Column('status_old', sa.String(), nullable=True))
conn = op.get_bind()
conn.execute(sa.text("""
UPDATE users
SET status_old = CASE
WHEN status = 1 THEN 'active'
ELSE 'inactive'
END
"""))
op.drop_column('users', 'status')
op.alter_column('users', 'status_old', new_column_name='status', nullable=False)#复杂表结构变更
def upgrade() -> None:
# 1. 创建临时表
op.create_table('_temp_users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('username', sa.String(length=80), nullable=False),
sa.Column('email', sa.String(length=120), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('email'),
sa.UniqueConstraint('username')
)
# 2. 复制数据
op.execute("""
INSERT INTO _temp_users (id, username, email, created_at)
SELECT id, username, email, created_at FROM users
""")
# 3. 删除原表,重命名临时表
op.drop_table('users')
op.rename_table('_temp_users', 'users')
def downgrade() -> None:
# 回滚操作
op.drop_table('users')
# 恢复到之前的表结构#高级Alembic操作
from alembic import op
from sqlalchemy import text
def upgrade() -> None:
# 条件执行
conn = op.get_bind()
result = conn.execute(text("SELECT COUNT(*) FROM users")).scalar()
if result > 0:
# 只有当用户表不为空时才执行某些操作
op.add_column('users', sa.Column('last_login', sa.DateTime(), nullable=True))
# 批量操作
batch_op = op.batch_alter_table('users', schema=None)
batch_op.add_column(sa.Column('first_name', sa.String(length=50)))
batch_op.add_column(sa.Column('last_name', sa.String(length=50)))
batch_op.create_index('ix_users_first_name', ['first_name'])
batch_op.create_index('ix_users_last_name', ['last_name'])
def downgrade() -> None:
batch_op = op.batch_alter_table('users', schema=None)
batch_op.drop_index('ix_users_first_name')
batch_op.drop_index('ix_users_last_name')
batch_op.drop_column('last_name')
batch_op.drop_column('first_name')
op.drop_column('users', 'last_login')#迁移执行命令
# 查看当前迁移状态
alembic current
# 查看迁移历史
alembic history
alembic history --verbose # 详细信息
# 查看待执行的迁移
alembic heads
# 生成SQL而不执行(离线模式)
alembic upgrade head --sql
# 升级到最新版本
alembic upgrade head
# 升级到指定版本
alembic upgrade abc123
# 升一级
alembic upgrade +1
# 回滚一级
alembic downgrade -1
# 回滚到初始版本
alembic downgrade base
# 检查是否需要迁移
alembic check#版本控制与管理
#迁移文件依赖关系
base (初始)
├── 001_add_users.py
│ ↓
├── 002_add_posts.py
│ ↓
└── 003_add_comments.py
↓
head (最新)每个迁移文件的 down_revision 必须指向前一个版本:
# 003_add_comments.py
down_revision = "002_add_posts" # 指向 002
# 002_add_posts.py
down_revision = "001_add_users" # 指向 001
# 001_add_users.py
down_revision = None # 初始版本#合并分支迁移
当多个开发分支都有数据库变更时,需要合并迁移:
# 1. 生成合并迁移
alembic merge -m "merge feature branches"
# 2. 合并后的迁移文件
# alembic/versions/xxx_merge_feature_branches.py
"""merge feature branches
Revision ID: xxx
Revises: ('aaa', 'bbb') # 同时依赖两个分支的最新迁移
Create Date: ...
"""
down_revision = ('aaa', 'bbb') # 指向两个父迁移
branch_labels = None
depends_on = None
def upgrade():
pass # 通常为空,只是合并依赖
def downgrade():
pass # 通常为空#处理迁移冲突
当团队成员同时提交迁移时可能出现冲突:
# 冲突示例:两个迁移都试图修改同一张表
# 迁移A:add_user_phone.py
down_revision = 'old_version'
# 迁移B:add_user_email.py
down_revision = 'old_version' # 两者都基于相同的老版本
# 解决方法:手动修正后一个迁移的down_revision
# 迁移B(修正后):add_user_email.py
down_revision = 'add_user_phone' # 指向先合并的迁移#多环境配置
#环境配置文件
# config.py
import os
from typing import Dict
class DatabaseConfig:
ENV_DATABASES: Dict[str, str] = {
"development": "postgresql+asyncpg://localhost:5432/myapp_dev",
"testing": "postgresql+asyncpg://localhost:5432/myapp_test",
"staging": "postgresql+asyncpg://user:pass@staging-db:5432/myapp",
"production": "postgresql+asyncpg://user:pass@prod-db:5432/myapp",
}
@classmethod
def get_database_url(cls, env: str = None) -> str:
env = env or os.getenv("APP_ENV", "development")
return cls.ENV_DATABASES.get(env, cls.ENV_DATABASES["development"])
# 使用示例
database_url = DatabaseConfig.get_database_url()#不同环境的Alembic配置
# alembic/env.py (增强版)
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
import os
# 导入项目配置
from app.config import DatabaseConfig
# 获取环境变量
config = context.config
# 根据环境设置数据库URL
env = os.getenv("APP_ENV", "development")
database_url = DatabaseConfig.get_database_url(env)
config.set_main_option("sqlalchemy.url", database_url)
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# 导入模型
from app.models.base import Base
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""离线模式:生成 SQL 脚本,不连接数据库"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""在线模式:直接连接数据库执行迁移"""
connectable = engine_from_config(
context.config.get_section(context.config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
compare_server_default=True,
include_object=include_object, # 自定义过滤逻辑
)
with context.begin_transaction():
context.run_migrations()
def include_object(object, name, type_, reflected, compare_to):
"""自定义对象过滤逻辑"""
if type_ == "table":
# 排除某些表(如临时表、日志表)
excluded_tables = ["temp_*", "log_*"]
for pattern in excluded_tables:
if name.startswith(pattern.rstrip('*')):
return False
return True
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()#环境特定的迁移命令
# 开发环境
APP_ENV=development alembic upgrade head
# 测试环境
APP_ENV=testing alembic upgrade head
# 预发布环境
APP_ENV=staging alembic upgrade head
# 生产环境
APP_ENV=production alembic upgrade head
# 或使用不同的配置文件
alembic -x db_url=postgresql://user:pass@prod/db upgrade head#生产环境部署
#Docker部署配置
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
# 运行迁移作为启动的一部分
CMD ["sh", "-c", "alembic upgrade head && uvicorn app.main:app --host 0.0.0.0 --port 8000"]#Kubernetes部署
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: fastapi-app
spec:
template:
spec:
initContainers:
- name: db-migration
image: your-fastapi-app:latest
command: ['alembic', 'upgrade', 'head']
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url
containers:
- name: fastapi-app
image: your-fastapi-app:latest
ports:
- containerPort: 8000#CI/CD集成
# .github/workflows/deploy.yml
name: Deploy
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Test migrations
run: |
docker-compose -f docker-compose.test.yml up -d
sleep 10
alembic upgrade head
docker-compose -f docker-compose.test.yml down
- name: Deploy migrations to staging
run: |
# 部署到预发布环境
APP_ENV=staging alembic upgrade head
- name: Manual approval for production
uses: actions/manual-approval@latest
with:
secret: ${{ github.TOKEN }}
approvers: admin-user
- name: Deploy to production
run: |
# 生成SQL进行审核
alembic upgrade head --sql > migration.sql
# 审核SQL后执行
alembic upgrade head#安全的生产迁移流程
#!/bin/bash
# production_migration.sh
set -e # 遇到错误立即退出
echo "=== 开始生产环境数据库迁移 ==="
# 1. 备份当前数据库
echo "备份数据库..."
pg_dump -h $DB_HOST -U $DB_USER -d $DB_NAME > backup_$(date +%Y%m%d_%H%M%S).sql
# 2. 检查迁移状态
echo "检查当前迁移状态..."
alembic current
# 3. 生成迁移SQL(用于审核)
echo "生成迁移SQL..."
alembic upgrade head --sql > migration_plan.sql
# 4. 显示迁移计划供审核
echo "迁移计划:"
cat migration_plan.sql
read -p "确认执行迁移? (yes/no): " confirm
if [[ $confirm != "yes" ]]; then
echo "迁移取消"
exit 1
fi
# 5. 执行迁移
echo "执行迁移..."
alembic upgrade head
# 6. 验证迁移结果
echo "验证迁移结果..."
alembic current
# 7. 清理临时文件
rm migration_plan.sql
echo "=== 迁移完成 ==="#高级迁移技巧
#数据迁移与转换
# 复杂的数据转换迁移
def upgrade() -> None:
# 1. 添加临时列
op.add_column('users', sa.Column('full_name_temp', sa.String(200)))
# 2. 数据转换
conn = op.get_bind()
result = conn.execute(sa.text("""
UPDATE users
SET full_name_temp = CONCAT(first_name, ' ', last_name)
WHERE first_name IS NOT NULL AND last_name IS NOT NULL
"""))
# 3. 验证转换结果
count = conn.execute(sa.text("""
SELECT COUNT(*) FROM users
WHERE full_name_temp IS NULL AND first_name IS NOT NULL
""")).scalar()
if count > 0:
raise Exception(f"数据转换失败:{count} 条记录转换异常")
# 4. 交换列
op.drop_column('users', 'first_name')
op.drop_column('users', 'last_name')
op.alter_column('users', 'full_name_temp', new_column_name='full_name')
def downgrade() -> None:
# 回滚逻辑
op.add_column('users', sa.Column('first_name', sa.String(100)))
op.add_column('users', sa.Column('last_name', sa.String(100)))
conn = op.get_bind()
conn.execute(sa.text("""
UPDATE users
SET first_name = SPLIT_PART(full_name, ' ', 1),
last_name = SUBSTRING(full_name, LENGTH(SPLIT_PART(full_name, ' ', 1)) + 2)
"""))
op.drop_column('users', 'full_name')#条件迁移
def upgrade() -> None:
conn = op.get_bind()
# 检查表是否存在
result = conn.execute(sa.text("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'users'
);
""")).scalar()
if result:
# 表存在时的操作
op.add_column('users', sa.Column('migration_version', sa.Integer(), default=1))
else:
# 表不存在时的操作
op.create_table('users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('username', sa.String(80), nullable=False),
sa.PrimaryKeyConstraint('id')
)
def downgrade() -> None:
conn = op.get_bind()
# 检查列是否存在
result = conn.execute(sa.text("""
SELECT EXISTS (
SELECT FROM information_schema.columns
WHERE table_name = 'users' AND column_name = 'migration_version'
);
""")).scalar()
if result:
op.drop_column('users', 'migration_version')#常见陷阱与解决方案
#陷阱1:模型与数据库不一致
# ❌ 错误:模型已更改但未生成迁移
class User(Base):
__tablename__ = 'users'
id = sa.Column(sa.Integer, primary_key=True)
new_field = sa.Column(sa.String(50)) # 新字段
# 直接运行 alembic revision --autogenerate
# 可能生成不正确的迁移
# ✅ 正确:先确保数据库与模型一致
# 1. 如果是新项目:先创建初始迁移
alembic revision --autogenerate -m "initial migration"
alembic upgrade head
# 2. 再添加新字段并生成迁移
alembic revision --autogenerate -m "add new_field to users"#陷阱2:downgrade函数不完整
# ❌ 错误:downgrade函数不完整
def upgrade():
op.create_table('new_table',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(50), nullable=False),
sa.PrimaryKeyConstraint('id')
)
# 添加索引
op.create_index('ix_new_table_name', 'new_table', ['name'])
def downgrade():
# 忘记删除索引!
op.drop_table('new_table') # 这会导致错误
# ✅ 正确:完整的upgrade/downgrade对
def downgrade():
op.drop_index('ix_new_table_name', table_name='new_table') # 先删除索引
op.drop_table('new_table') # 再删除表#陷阱3:数据迁移时的性能问题
# ❌ 错误:一次性处理大量数据
def upgrade():
# 对百万级表进行逐行更新 - 性能极差
conn = op.get_bind()
for row in conn.execute(sa.text("SELECT id FROM large_table")):
conn.execute(sa.text(f"UPDATE large_table SET new_col = 'value' WHERE id = {row.id}"))
# ✅ 正确:批量处理
def upgrade():
conn = op.get_bind()
# 批量更新
conn.execute(sa.text("UPDATE large_table SET new_col = 'value' WHERE new_col IS NULL"))#最佳实践
#1. 迁移文件组织
alembic/
├── env.py
├── script.py.mako
└── versions/
├── 001_initial_tables.py # 初始表结构
├── 002_add_user_constraints.py # 添加约束
├── 003_migrate_user_data.py # 数据迁移
└── 20240115_add_user_profile.py # 功能特性#2. 迁移编写规范
# 推荐的迁移文件模板
"""添加用户个人资料字段
修订ID: abc123456789
前一修订: def987654321
创建日期: 2024-01-15 10:30:00.000000
迁移目的:
- 添加用户个人资料相关字段
- 迁移现有用户数据
- 创建相关索引
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column
# 修订标识符
revision = 'abc123456789'
down_revision = 'def987654321'
branch_labels = None
depends_on = None
def upgrade() -> None:
"""升级操作"""
# 1. 添加列
op.add_column('users', sa.Column('bio', sa.Text(), nullable=True))
op.add_column('users', sa.Column('location', sa.String(100), nullable=True))
op.add_column('users', sa.Column('website', sa.String(200), nullable=True))
# 2. 创建索引
op.create_index('ix_users_location', 'users', ['location'])
# 3. 添加约束
op.create_check_constraint(
'ck_website_format',
'users',
"website IS NULL OR website LIKE 'http://%'" # 网站URL格式验证
)
def downgrade() -> None:
"""降级操作(必须与upgrade完全相反)"""
# 按相反顺序执行相反操作
op.drop_constraint('ck_website_format', 'users', type_='check')
op.drop_index('ix_users_location', table_name='users')
op.drop_column('users', 'website')
op.drop_column('users', 'location')
op.drop_column('users', 'bio')#3. 测试策略
# test_migrations.py
import pytest
from alembic.command import upgrade, downgrade
from alembic.config import Config
from sqlalchemy import create_engine, text
def test_migration_downgrade_cycle():
"""测试迁移-回滚循环"""
alembic_cfg = Config("alembic.ini")
# 升级到最新
upgrade(alembic_cfg, "head")
# 验证表结构
engine = create_engine(alembic_cfg.get_main_option("sqlalchemy.url"))
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM users LIMIT 1")).fetchone()
assert result is not None
# 回滚到上一版本
downgrade(alembic_cfg, "-1")
# 验证回滚效果
with engine.connect() as conn:
# 检查是否成功回滚(新字段应该不存在)
pass#与其他迁移工具对比
| 特性 | Alembic | Django Migrations | Flyway | Liquibase |
|---|---|---|---|---|
| 生态系统 | SQLAlchemy生态 | Django生态 | Java生态 | 多语言 |
| Python支持 | ✅ 原生支持 | ❌ 仅Django | ⚠️ 通过JDBC | ⚠️ 通过JDBC |
| SQLAlchemy集成 | ✅ 无缝集成 | ❌ 不适用 | ⚠️ 需额外配置 | ⚠️ 需额外配置 |
| 多数据库支持 | ✅ 广泛支持 | ✅ 广泛支持 | ✅ 广泛支持 | ✅ 广泛支持 |
| 复杂数据迁移 | ✅ 高度灵活 | ✅ 良好支持 | ✅ 脚本支持 | ✅ XML/YAML支持 |
| 学习曲线 | ⚠️ 中等 | ✅ 简单(Django用户) | ⚠️ 中等 | ❌ 较陡峭 |
| FastAPI兼容性 | ✅ 完美兼容 | ❌ 不适用 | ✅ 可用 | ✅ 可用 |
#总结
Alembic作为SQLAlchemy的官方迁移工具,为FastAPI项目提供了强大的数据库版本控制能力:
- 版本控制:像Git一样管理数据库结构变更
- 自动化:自动生成大部分迁移脚本
- 安全性:支持回滚,降低生产风险
- 灵活性:支持复杂的数据迁移和转换
- 集成性:与SQLAlchemy完美配合
通过遵循最佳实践和避免常见陷阱,Alembic能够帮助您安全、高效地管理数据库结构演进。
💡 关键要点:始终在生产环境之前在测试环境中验证迁移,确保downgrade函数与upgrade函数完全对称。
🔗 扩展阅读
</final_file_content>

