FastAPI与Alembic数据库迁移完全指南

📂 所属阶段:第三阶段 — 数据持久化(数据库篇)
🔗 相关章节:FastAPI SQLAlchemy 2.0实战 · FastAPI依赖注入系统

目录

为什么需要数据库迁移工具?

传统数据库管理的问题

在没有数据库迁移工具的情况下,我们通常面临以下挑战:

场景传统做法问题影响
添加字段ALTER TABLE users ADD COLUMN phone VARCHAR(20);手动执行,难以回滚生产环境风险高
修改表结构直接修改数据库没有版本控制团队协作困难
回滚变更手写回滚SQL容易出错数据安全风险
多环境同步手动复制SQL容易遗漏环境不一致
团队协作每人维护自己的SQL版本混乱项目维护困难

Alembic的核心价值

Alembic作为SQLAlchemy的官方迁移工具,提供了:

  1. 版本控制:数据库结构的Git-like管理
  2. 自动化:模型变更自动生成迁移脚本
  3. 安全性:可逆操作,支持回滚
  4. 一致性:确保多环境数据库结构同步
  5. 审计追踪:完整的变更历史记录

项目依赖安装

# 安装Alembic
pip install alembic

# 如果使用PostgreSQL
pip install alembic psycopg2-binary

# 如果使用MySQL
pip install alembic PyMySQL

# 如果使用SQLite(内置支持)
pip install alembic

完整依赖示例

# requirements.txt
fastapi==0.104.1
sqlalchemy==2.0.23
alembic==1.13.1
asyncpg==0.29.0  # PostgreSQL异步驱动
psycopg2-binary==2.9.9  # PostgreSQL同步驱动
pydantic==2.5.0
python-multipart==0.0.6
uvicorn==0.24.0

Alembic基础概念

Alembic核心组件

Alembic的主要组成部分包括:

  1. 环境配置 (env.py):连接数据库和模型的桥梁
  2. 配置文件 (alembic.ini):全局配置参数
  3. 迁移脚本 (versions/):版本化的数据库变更
  4. 模板文件 (script.py.mako):生成迁移脚本的模板

项目集成与配置

FastAPI项目结构

my_fastapi_project/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── models/
│   │   ├── __init__.py
│   │   ├── base.py
│   │   └── user.py
│   ├── schemas/
│   └── database.py
├── alembic/
│   ├── env.py
│   ├── script.py.mako
│   └── versions/
├── alembic.ini
├── config.py
└── requirements.txt

修改env.py以适应FastAPI项目

# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context

# 导入FastAPI项目中的Base
from app.models.base import Base
from app.database import DATABASE_URL

# 从FastAPI配置中获取数据库URL
config = context.config

# 设置数据库URL
config.set_main_option("sqlalchemy.url", DATABASE_URL)

# 配置Logging
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# 目标元数据 - 指向我们的模型
target_metadata = Base.metadata

def run_migrations_offline() -> None:
    """在离线模式下运行迁移(不连接数据库)"""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online() -> None:
    """在在线模式下运行迁移(连接数据库)"""
    connectable = context.config.attributes.get('connection', None)
    
    if connectable is None:
        connectable = engine_from_config(
            context.config.get_section(context.config.config_ini_section),
            prefix="sqlalchemy.",
            poolclass=pool.NullPool,
        )

    with connectable.connect() as connection:
        context.configure(
            connection=connection, 
            target_metadata=target_metadata,
            compare_type=True,  # 比较列类型变更
            compare_server_default=True,  # 比较默认值变更
        )

        with context.begin_transaction():
            context.run_migrations()

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

自定义配置文件 (alembic.ini)

# alembic.ini
[alembic]
# 设置源文件夹(包含models的文件夹)
script_location = alembic

# 模块路径,用于导入模型
prepend_sys_path = .

# 迁移文件名模板
file_template = %%(rev)s_%%(slug)s

# 表前缀(可选)
# table_prefix = myapp_

# timezone to use when rendering the date within the migration file
# as well as the filename. string value is passed to dateutil.tz.gettz()
# if left blank, dates will be recorded in UTC
# timezone =

# max length of characters to apply to the "slug" field
# max_length = 40

# set to 'true' to run the environment during the 'revision' command,
# regardless of autogenerate
# revision_environment = false

# set to 'true' to allow .pyc and .pyo files without a source .py file
# sourceless = false

# version_num separator; default is ``_`` (underscore), ``.`` is supported
# version_separator = .

# version path separator; default is ``os.sep`` (path separator)
# version_path_separator = /

# set to 'true' to search and use version locations from the environment
# version_locations = %(here)s/bar:%(here)s/bat
# version_path_separator = :

# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARN
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stdout,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

2. 配置 env.py

2.1 基础配置

# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context

# 导入你的模型(重要!Alembic 据此生成迁移脚本)
from models import Base  # SQLAlchemy Base
from config import get_settings

config = context.config
settings = get_settings()

# 设置数据库 URL
config.set_main_option("sqlalchemy.url", settings.database_url)

if config.config_file_name is not None:
    fileConfig(config.config_file_name)

target_metadata = Base.metadata


def run_migrations_offline() -> None:
    """离线模式:生成 SQL 脚本,不连接数据库"""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )
    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online() -> None:
    """在线模式:直接连接数据库执行迁移"""
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)
        with context.begin_transaction():
            context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

自动生成迁移脚本

基础自动生成命令

# 对比模型与当前数据库,自动生成迁移脚本
alembic revision --autogenerate -m "add user avatar column"

# 参数说明:
# --autogenerate:自动对比模型差异
# -m:添加迁移说明

自动迁移的工作原理

  1. 模型扫描:Alembic扫描项目中的SQLAlchemy模型
  2. 数据库对比:连接当前数据库,获取现有表结构
  3. 差异分析:比较模型与数据库结构差异
  4. 脚本生成:根据差异生成upgrade/downgrade函数

生成的迁移文件示例

# alembic/versions/20260326_add_user_avatar.py
"""add user avatar column

Revision ID: abc123
Revises: def456
Create Date: 2026-03-26 14:00:00.000000

"""
from alembic import op
import sqlalchemy as sa

# revision identifiers
revision = 'abc123'
down_revision = 'def456'
branch_labels = None
depends_on = None

def upgrade() -> None:
    # 自动检测到模型中新增的avatar字段
    op.add_column('users', sa.Column('avatar', sa.String(length=500), nullable=True))

def downgrade() -> None:
    # 回滚操作:删除avatar字段
    op.drop_column('users', 'avatar')

常见自动生成场景

# 1. 添加新表
def upgrade() -> None:
    op.create_table('posts',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('title', sa.String(length=200), nullable=False),
        sa.Column('content', sa.Text(), nullable=True),
        sa.Column('created_at', sa.DateTime(), nullable=True),
        sa.PrimaryKeyConstraint('id')
    )

def downgrade() -> None:
    op.drop_table('posts')

# 2. 修改列类型
def upgrade() -> None:
    op.alter_column('users', 'email',
               existing_type=sa.VARCHAR(length=50),
               type_=sa.String(length=100),
               existing_nullable=False)

def downgrade() -> None:
    op.alter_column('users', 'email',
               existing_type=sa.String(length=100),
               type_=sa.VARCHAR(length=50),
               existing_nullable=False)

# 3. 添加外键约束
def upgrade() -> None:
    op.create_foreign_key('fk_posts_user_id', 'posts', 'users', ['user_id'], ['id'])

def downgrade() -> None:
    op.drop_constraint('fk_posts_user_id', 'posts', type_='foreignkey')

手动编写迁移脚本

何时需要手动编写

  1. 数据迁移:需要在结构变更的同时迁移现有数据
  2. 复杂操作:自动生成无法处理的复杂SQL
  3. 条件判断:根据特定条件执行不同操作
  4. 性能优化:需要特定的执行顺序或优化

数据迁移示例

# 迁移用户状态字段:从字符串改为整数枚举
def upgrade() -> None:
    # 1. 添加新列
    op.add_column('users', sa.Column('status_new', sa.Integer(), nullable=True))
    
    # 2. 迁移数据
    conn = op.get_bind()
    conn.execute(sa.text("""
        UPDATE users 
        SET status_new = CASE 
            WHEN status = 'active' THEN 1
            WHEN status = 'inactive' THEN 0
            ELSE 0
        END
    """))
    
    # 3. 删除旧列,重命名新列
    op.drop_column('users', 'status')
    op.alter_column('users', 'status_new', new_column_name='status', nullable=False)
    
    # 4. 添加约束
    op.create_check_constraint('status_check', 'users', 'status IN (0, 1)')

def downgrade() -> None:
    # 回滚操作
    op.add_column('users', sa.Column('status_old', sa.String(), nullable=True))
    
    conn = op.get_bind()
    conn.execute(sa.text("""
        UPDATE users 
        SET status_old = CASE 
            WHEN status = 1 THEN 'active'
            ELSE 'inactive'
        END
    """))
    
    op.drop_column('users', 'status')
    op.alter_column('users', 'status_old', new_column_name='status', nullable=False)

复杂表结构变更

def upgrade() -> None:
    # 1. 创建临时表
    op.create_table('_temp_users',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('username', sa.String(length=80), nullable=False),
        sa.Column('email', sa.String(length=120), nullable=False),
        sa.Column('created_at', sa.DateTime(), nullable=False),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('email'),
        sa.UniqueConstraint('username')
    )
    
    # 2. 复制数据
    op.execute("""
        INSERT INTO _temp_users (id, username, email, created_at)
        SELECT id, username, email, created_at FROM users
    """)
    
    # 3. 删除原表,重命名临时表
    op.drop_table('users')
    op.rename_table('_temp_users', 'users')

def downgrade() -> None:
    # 回滚操作
    op.drop_table('users')
    # 恢复到之前的表结构

高级Alembic操作

from alembic import op
from sqlalchemy import text

def upgrade() -> None:
    # 条件执行
    conn = op.get_bind()
    result = conn.execute(text("SELECT COUNT(*) FROM users")).scalar()
    
    if result > 0:
        # 只有当用户表不为空时才执行某些操作
        op.add_column('users', sa.Column('last_login', sa.DateTime(), nullable=True))
    
    # 批量操作
    batch_op = op.batch_alter_table('users', schema=None)
    batch_op.add_column(sa.Column('first_name', sa.String(length=50)))
    batch_op.add_column(sa.Column('last_name', sa.String(length=50)))
    batch_op.create_index('ix_users_first_name', ['first_name'])
    batch_op.create_index('ix_users_last_name', ['last_name'])

def downgrade() -> None:
    batch_op = op.batch_alter_table('users', schema=None)
    batch_op.drop_index('ix_users_first_name')
    batch_op.drop_index('ix_users_last_name')
    batch_op.drop_column('last_name')
    batch_op.drop_column('first_name')
    
    op.drop_column('users', 'last_login')

迁移执行命令

# 查看当前迁移状态
alembic current

# 查看迁移历史
alembic history
alembic history --verbose  # 详细信息

# 查看待执行的迁移
alembic heads

# 生成SQL而不执行(离线模式)
alembic upgrade head --sql

# 升级到最新版本
alembic upgrade head

# 升级到指定版本
alembic upgrade abc123

# 升一级
alembic upgrade +1

# 回滚一级
alembic downgrade -1

# 回滚到初始版本
alembic downgrade base

# 检查是否需要迁移
alembic check

版本控制与管理

迁移文件依赖关系

base (初始)
  ├── 001_add_users.py
  │        ↓
  ├── 002_add_posts.py
  │        ↓
  └── 003_add_comments.py

         head (最新)

每个迁移文件的 down_revision 必须指向前一个版本:

# 003_add_comments.py
down_revision = "002_add_posts"  # 指向 002

# 002_add_posts.py
down_revision = "001_add_users"  # 指向 001

# 001_add_users.py
down_revision = None  # 初始版本

合并分支迁移

当多个开发分支都有数据库变更时,需要合并迁移:

# 1. 生成合并迁移
alembic merge -m "merge feature branches"

# 2. 合并后的迁移文件
# alembic/versions/xxx_merge_feature_branches.py
"""merge feature branches

Revision ID: xxx
Revises: ('aaa', 'bbb')  # 同时依赖两个分支的最新迁移
Create Date: ...
"""

down_revision = ('aaa', 'bbb')  # 指向两个父迁移
branch_labels = None
depends_on = None

def upgrade():
    pass  # 通常为空,只是合并依赖

def downgrade():
    pass  # 通常为空

处理迁移冲突

当团队成员同时提交迁移时可能出现冲突:

# 冲突示例:两个迁移都试图修改同一张表
# 迁移A:add_user_phone.py
down_revision = 'old_version'

# 迁移B:add_user_email.py  
down_revision = 'old_version'  # 两者都基于相同的老版本

# 解决方法:手动修正后一个迁移的down_revision
# 迁移B(修正后):add_user_email.py
down_revision = 'add_user_phone'  # 指向先合并的迁移

多环境配置

环境配置文件

# config.py
import os
from typing import Dict

class DatabaseConfig:
    ENV_DATABASES: Dict[str, str] = {
        "development": "postgresql+asyncpg://localhost:5432/myapp_dev",
        "testing": "postgresql+asyncpg://localhost:5432/myapp_test", 
        "staging": "postgresql+asyncpg://user:pass@staging-db:5432/myapp",
        "production": "postgresql+asyncpg://user:pass@prod-db:5432/myapp",
    }
    
    @classmethod
    def get_database_url(cls, env: str = None) -> str:
        env = env or os.getenv("APP_ENV", "development")
        return cls.ENV_DATABASES.get(env, cls.ENV_DATABASES["development"])

# 使用示例
database_url = DatabaseConfig.get_database_url()

不同环境的Alembic配置

# alembic/env.py (增强版)
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
import os

# 导入项目配置
from app.config import DatabaseConfig

# 获取环境变量
config = context.config

# 根据环境设置数据库URL
env = os.getenv("APP_ENV", "development")
database_url = DatabaseConfig.get_database_url(env)
config.set_main_option("sqlalchemy.url", database_url)

if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# 导入模型
from app.models.base import Base
target_metadata = Base.metadata

def run_migrations_offline() -> None:
    """离线模式:生成 SQL 脚本,不连接数据库"""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online() -> None:
    """在线模式:直接连接数据库执行迁移"""
    connectable = engine_from_config(
        context.config.get_section(context.config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection, 
            target_metadata=target_metadata,
            compare_type=True,
            compare_server_default=True,
            include_object=include_object,  # 自定义过滤逻辑
        )

        with context.begin_transaction():
            context.run_migrations()

def include_object(object, name, type_, reflected, compare_to):
    """自定义对象过滤逻辑"""
    if type_ == "table":
        # 排除某些表(如临时表、日志表)
        excluded_tables = ["temp_*", "log_*"]
        for pattern in excluded_tables:
            if name.startswith(pattern.rstrip('*')):
                return False
    return True

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

环境特定的迁移命令

# 开发环境
APP_ENV=development alembic upgrade head

# 测试环境
APP_ENV=testing alembic upgrade head

# 预发布环境
APP_ENV=staging alembic upgrade head

# 生产环境
APP_ENV=production alembic upgrade head

# 或使用不同的配置文件
alembic -x db_url=postgresql://user:pass@prod/db upgrade head

生产环境部署

Docker部署配置

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

# 运行迁移作为启动的一部分
CMD ["sh", "-c", "alembic upgrade head && uvicorn app.main:app --host 0.0.0.0 --port 8000"]

Kubernetes部署

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: fastapi-app
spec:
  template:
    spec:
      initContainers:
      - name: db-migration
        image: your-fastapi-app:latest
        command: ['alembic', 'upgrade', 'head']
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: db-secret
              key: url
      containers:
      - name: fastapi-app
        image: your-fastapi-app:latest
        ports:
        - containerPort: 8000

CI/CD集成

# .github/workflows/deploy.yml
name: Deploy

on:
  push:
    branches: [main]

jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    
    - name: Setup Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.11'
    
    - name: Install dependencies
      run: pip install -r requirements.txt
    
    - name: Test migrations
      run: |
        docker-compose -f docker-compose.test.yml up -d
        sleep 10
        alembic upgrade head
        docker-compose -f docker-compose.test.yml down
    
    - name: Deploy migrations to staging
      run: |
        # 部署到预发布环境
        APP_ENV=staging alembic upgrade head
    
    - name: Manual approval for production
      uses: actions/manual-approval@latest
      with:
        secret: ${{ github.TOKEN }}
        approvers: admin-user
    
    - name: Deploy to production
      run: |
        # 生成SQL进行审核
        alembic upgrade head --sql > migration.sql
        # 审核SQL后执行
        alembic upgrade head

安全的生产迁移流程

#!/bin/bash
# production_migration.sh

set -e  # 遇到错误立即退出

echo "=== 开始生产环境数据库迁移 ==="

# 1. 备份当前数据库
echo "备份数据库..."
pg_dump -h $DB_HOST -U $DB_USER -d $DB_NAME > backup_$(date +%Y%m%d_%H%M%S).sql

# 2. 检查迁移状态
echo "检查当前迁移状态..."
alembic current

# 3. 生成迁移SQL(用于审核)
echo "生成迁移SQL..."
alembic upgrade head --sql > migration_plan.sql

# 4. 显示迁移计划供审核
echo "迁移计划:"
cat migration_plan.sql

read -p "确认执行迁移? (yes/no): " confirm
if [[ $confirm != "yes" ]]; then
    echo "迁移取消"
    exit 1
fi

# 5. 执行迁移
echo "执行迁移..."
alembic upgrade head

# 6. 验证迁移结果
echo "验证迁移结果..."
alembic current

# 7. 清理临时文件
rm migration_plan.sql

echo "=== 迁移完成 ==="

高级迁移技巧

数据迁移与转换

# 复杂的数据转换迁移
def upgrade() -> None:
    # 1. 添加临时列
    op.add_column('users', sa.Column('full_name_temp', sa.String(200)))
    
    # 2. 数据转换
    conn = op.get_bind()
    result = conn.execute(sa.text("""
        UPDATE users 
        SET full_name_temp = CONCAT(first_name, ' ', last_name)
        WHERE first_name IS NOT NULL AND last_name IS NOT NULL
    """))
    
    # 3. 验证转换结果
    count = conn.execute(sa.text("""
        SELECT COUNT(*) FROM users 
        WHERE full_name_temp IS NULL AND first_name IS NOT NULL
    """)).scalar()
    
    if count > 0:
        raise Exception(f"数据转换失败:{count} 条记录转换异常")
    
    # 4. 交换列
    op.drop_column('users', 'first_name')
    op.drop_column('users', 'last_name')
    op.alter_column('users', 'full_name_temp', new_column_name='full_name')

def downgrade() -> None:
    # 回滚逻辑
    op.add_column('users', sa.Column('first_name', sa.String(100)))
    op.add_column('users', sa.Column('last_name', sa.String(100)))
    
    conn = op.get_bind()
    conn.execute(sa.text("""
        UPDATE users 
        SET first_name = SPLIT_PART(full_name, ' ', 1),
            last_name = SUBSTRING(full_name, LENGTH(SPLIT_PART(full_name, ' ', 1)) + 2)
    """))
    
    op.drop_column('users', 'full_name')

条件迁移

def upgrade() -> None:
    conn = op.get_bind()
    
    # 检查表是否存在
    result = conn.execute(sa.text("""
        SELECT EXISTS (
            SELECT FROM information_schema.tables 
            WHERE table_name = 'users'
        );
    """)).scalar()
    
    if result:
        # 表存在时的操作
        op.add_column('users', sa.Column('migration_version', sa.Integer(), default=1))
    else:
        # 表不存在时的操作
        op.create_table('users',
            sa.Column('id', sa.Integer(), nullable=False),
            sa.Column('username', sa.String(80), nullable=False),
            sa.PrimaryKeyConstraint('id')
        )

def downgrade() -> None:
    conn = op.get_bind()
    
    # 检查列是否存在
    result = conn.execute(sa.text("""
        SELECT EXISTS (
            SELECT FROM information_schema.columns 
            WHERE table_name = 'users' AND column_name = 'migration_version'
        );
    """)).scalar()
    
    if result:
        op.drop_column('users', 'migration_version')

常见陷阱与解决方案

陷阱1:模型与数据库不一致

# ❌ 错误:模型已更改但未生成迁移
class User(Base):
    __tablename__ = 'users'
    id = sa.Column(sa.Integer, primary_key=True)
    new_field = sa.Column(sa.String(50))  # 新字段

# 直接运行 alembic revision --autogenerate
# 可能生成不正确的迁移

# ✅ 正确:先确保数据库与模型一致
# 1. 如果是新项目:先创建初始迁移
alembic revision --autogenerate -m "initial migration"
alembic upgrade head

# 2. 再添加新字段并生成迁移
alembic revision --autogenerate -m "add new_field to users"

陷阱2:downgrade函数不完整

# ❌ 错误:downgrade函数不完整
def upgrade():
    op.create_table('new_table',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('name', sa.String(50), nullable=False),
        sa.PrimaryKeyConstraint('id')
    )
    # 添加索引
    op.create_index('ix_new_table_name', 'new_table', ['name'])

def downgrade():
    # 忘记删除索引!
    op.drop_table('new_table')  # 这会导致错误

# ✅ 正确:完整的upgrade/downgrade对
def downgrade():
    op.drop_index('ix_new_table_name', table_name='new_table')  # 先删除索引
    op.drop_table('new_table')  # 再删除表

陷阱3:数据迁移时的性能问题

# ❌ 错误:一次性处理大量数据
def upgrade():
    # 对百万级表进行逐行更新 - 性能极差
    conn = op.get_bind()
    for row in conn.execute(sa.text("SELECT id FROM large_table")):
        conn.execute(sa.text(f"UPDATE large_table SET new_col = 'value' WHERE id = {row.id}"))

# ✅ 正确:批量处理
def upgrade():
    conn = op.get_bind()
    # 批量更新
    conn.execute(sa.text("UPDATE large_table SET new_col = 'value' WHERE new_col IS NULL"))

最佳实践

1. 迁移文件组织

alembic/
├── env.py
├── script.py.mako
└── versions/
    ├── 001_initial_tables.py      # 初始表结构
    ├── 002_add_user_constraints.py # 添加约束
    ├── 003_migrate_user_data.py    # 数据迁移
    └── 20240115_add_user_profile.py # 功能特性

2. 迁移编写规范

# 推荐的迁移文件模板
"""添加用户个人资料字段

修订ID: abc123456789
前一修订: def987654321
创建日期: 2024-01-15 10:30:00.000000

迁移目的:
- 添加用户个人资料相关字段
- 迁移现有用户数据
- 创建相关索引
"""

from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column

# 修订标识符
revision = 'abc123456789'
down_revision = 'def987654321'
branch_labels = None
depends_on = None

def upgrade() -> None:
    """升级操作"""
    # 1. 添加列
    op.add_column('users', sa.Column('bio', sa.Text(), nullable=True))
    op.add_column('users', sa.Column('location', sa.String(100), nullable=True))
    op.add_column('users', sa.Column('website', sa.String(200), nullable=True))
    
    # 2. 创建索引
    op.create_index('ix_users_location', 'users', ['location'])
    
    # 3. 添加约束
    op.create_check_constraint(
        'ck_website_format', 
        'users', 
        "website IS NULL OR website LIKE 'http://%'"  # 网站URL格式验证
    )

def downgrade() -> None:
    """降级操作(必须与upgrade完全相反)"""
    # 按相反顺序执行相反操作
    op.drop_constraint('ck_website_format', 'users', type_='check')
    op.drop_index('ix_users_location', table_name='users')
    op.drop_column('users', 'website')
    op.drop_column('users', 'location') 
    op.drop_column('users', 'bio')

3. 测试策略

# test_migrations.py
import pytest
from alembic.command import upgrade, downgrade
from alembic.config import Config
from sqlalchemy import create_engine, text

def test_migration_downgrade_cycle():
    """测试迁移-回滚循环"""
    alembic_cfg = Config("alembic.ini")
    
    # 升级到最新
    upgrade(alembic_cfg, "head")
    
    # 验证表结构
    engine = create_engine(alembic_cfg.get_main_option("sqlalchemy.url"))
    with engine.connect() as conn:
        result = conn.execute(text("SELECT * FROM users LIMIT 1")).fetchone()
        assert result is not None
    
    # 回滚到上一版本
    downgrade(alembic_cfg, "-1")
    
    # 验证回滚效果
    with engine.connect() as conn:
        # 检查是否成功回滚(新字段应该不存在)
        pass

与其他迁移工具对比

特性AlembicDjango MigrationsFlywayLiquibase
生态系统SQLAlchemy生态Django生态Java生态多语言
Python支持✅ 原生支持❌ 仅Django⚠️ 通过JDBC⚠️ 通过JDBC
SQLAlchemy集成✅ 无缝集成❌ 不适用⚠️ 需额外配置⚠️ 需额外配置
多数据库支持✅ 广泛支持✅ 广泛支持✅ 广泛支持✅ 广泛支持
复杂数据迁移✅ 高度灵活✅ 良好支持✅ 脚本支持✅ XML/YAML支持
学习曲线⚠️ 中等✅ 简单(Django用户)⚠️ 中等❌ 较陡峭
FastAPI兼容性✅ 完美兼容❌ 不适用✅ 可用✅ 可用

总结

Alembic作为SQLAlchemy的官方迁移工具,为FastAPI项目提供了强大的数据库版本控制能力:

  1. 版本控制:像Git一样管理数据库结构变更
  2. 自动化:自动生成大部分迁移脚本
  3. 安全性:支持回滚,降低生产风险
  4. 灵活性:支持复杂的数据迁移和转换
  5. 集成性:与SQLAlchemy完美配合

通过遵循最佳实践和避免常见陷阱,Alembic能够帮助您安全、高效地管理数据库结构演进。

💡 关键要点:始终在生产环境之前在测试环境中验证迁移,确保downgrade函数与upgrade函数完全对称。


🔗 扩展阅读

</final_file_content>