基于角色的权限控制 (RBAC):如何保护你的敏感接口

📂 所属阶段:第四阶段 — 安全与认证(安全篇)
🔗 相关章节:OAuth2 与 JWT 鉴权 · 依赖注入系统


1. RBAC 核心概念

1.1 什么是 RBAC?

用户 → 角色 → 权限

         资源 + 操作

例子:
Alice(用户)→ 管理员(角色)→ 删除用户、修改配置(权限)
Bob(用户)  → 编辑者(角色)→ 创建文章、编辑文章(权限)
Carol(用户)→ 普通用户(角色)→ 阅读文章、评论(权限)

1.2 权限模型设计

权限资源操作
user:read用户读取
user:create用户创建
user:update用户修改
user:delete用户删除
article:read文章读取
article:write文章读写
article:publish文章发布
admin:access管理后台访问

2. 权限定义

2.1 权限枚举

# models/permission.py
from enum import Enum

class Permission(str, Enum):
    # 用户相关
    USER_READ = "user:read"
    USER_CREATE = "user:create"
    USER_UPDATE = "user:update"
    USER_DELETE = "user:delete"

    # 文章相关
    ARTICLE_READ = "article:read"
    ARTICLE_WRITE = "article:write"
    ARTICLE_PUBLISH = "article:publish"
    ARTICLE_DELETE = "article:delete"

    # 系统
    ADMIN_ACCESS = "admin:access"
    SYSTEM_CONFIG = "system:config"


class Role(str, Enum):
    ADMIN = "admin"
    EDITOR = "editor"
    AUTHOR = "author"
    USER = "user"
    GUEST = "guest"

2.2 角色-权限映射

# 角色权限配置
ROLE_PERMISSIONS: dict[Role, set[Permission]] = {
    Role.ADMIN: {
        Permission.USER_READ, Permission.USER_CREATE,
        Permission.USER_UPDATE, Permission.USER_DELETE,
        Permission.ARTICLE_READ, Permission.ARTICLE_WRITE,
        Permission.ARTICLE_PUBLISH, Permission.ARTICLE_DELETE,
        Permission.ADMIN_ACCESS, Permission.SYSTEM_CONFIG,
    },
    Role.EDITOR: {
        Permission.USER_READ,
        Permission.ARTICLE_READ, Permission.ARTICLE_WRITE, Permission.ARTICLE_PUBLISH,
        Permission.ARTICLE_DELETE,  # 仅删除自己发布的
    },
    Role.AUTHOR: {
        Permission.USER_READ,
        Permission.ARTICLE_READ, Permission.ARTICLE_WRITE,
    },
    Role.USER: {
        Permission.USER_READ,
        Permission.ARTICLE_READ,
    },
    Role.GUEST: {
        Permission.ARTICLE_READ,
    },
}

3. 权限依赖注入

3.1 单权限检查

# dependencies/auth.py
from fastapi import Depends, HTTPException, status
from models.permission import Permission, ROLE_PERMISSIONS, Role
from models.user import User

def require_permission(permission: Permission):
    """检查用户是否拥有指定权限"""
    async def checker(current_user: User = Depends(get_current_user)) -> User:
        # 从 JWT 获取用户角色
        user_role = Role(current_user.role)

        # 获取角色拥有的权限集合
        user_permissions = ROLE_PERMISSIONS.get(user_role, set())

        if permission not in user_permissions:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"缺少权限: {permission.value}",
            )
        return current_user
    return checker

# 使用
@app.delete("/users/{user_id}")
async def delete_user(
    user_id: int,
    _: User = Depends(require_permission(Permission.USER_DELETE)),
):
    ...

3.2 多权限检查(AND / OR)

def require_any_permission(*permissions: Permission):
    """拥有任一权限即可通过"""
    async def checker(current_user: User = Depends(get_current_user)) -> User:
        user_permissions = ROLE_PERMISSIONS.get(Role(current_user.role), set())
        if not any(p in user_permissions for p in permissions):
            raise HTTPException(403, f"需要以下任一权限: {[p.value for p in permissions]}")
        return current_user
    return checker

def require_all_permissions(*permissions: Permission):
    """必须拥有所有权限才通过"""
    async def checker(current_user: User = Depends(get_current_user)) -> User:
        user_permissions = ROLE_PERMISSIONS.get(Role(current_user.role), set())
        missing = [p for p in permissions if p not in user_permissions]
        if missing:
            raise HTTPException(403, f"缺少以下权限: {[p.value for p in missing]}")
        return current_user
    return checker

# 使用
@app.get("/dashboard")
async def dashboard(
    _: User = Depends(require_any_permission(
        Permission.ADMIN_ACCESS, Permission.ARTICLE_PUBLISH
    ))
):
    ...

3.3 资源级权限检查

async def require_ownership(resource_owner_id: int, current_user: User = Depends(get_current_user)):
    """检查资源是否属于当前用户,或用户是管理员"""
    if current_user.role == Role.ADMIN:
        return current_user
    if current_user.id != resource_owner_id:
        raise HTTPException(403, "无权操作此资源")
    return current_user

@app.delete("/articles/{article_id}")
async def delete_article(
    article_id: int,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    article = await db.get_article(article_id)

    # 资源级权限:文章作者或管理员可删除
    if article.author_id != current_user.id and current_user.role != Role.ADMIN:
        raise HTTPException(403, "无权删除此文章")

    await db.delete(article)
    return {"deleted": article_id}

4. 数据库级权限存储

4.1 用户-角色关联模型

# models/user.py 扩展
class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(unique=True)
    hashed_password: Mapped[str] = mapped_column()
    name: Mapped[str] = mapped_column()
    role: Mapped[str] = mapped_column(String(20), default=Role.USER.value)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)

    # 多对多:用户自定义权限(扩展角色权限)
    extra_permissions: Mapped[list["Permission"]] = relationship(
        secondary=user_permissions, back_populates="users"
    )


# 权限表
class PermissionModel(Base):
    __tablename__ = "permissions"
    id: Mapped[int] = mapped_column(primary_key=True)
    code: Mapped[str] = mapped_column(unique=True)
    name: Mapped[str] = mapped_column()
    description: Mapped[str | None]


# 用户-权限关联表
user_permissions = Table(
    "user_permissions", Base.metadata,
    Column("user_id", Integer, ForeignKey("users.id"), primary_key=True),
    Column("permission_id", Integer, ForeignKey("permissions.id"), primary_key=True),
)

4.2 获取用户完整权限

async def get_user_permissions(user: User) -> set[str]:
    """获取用户的所有权限 = 角色权限 + 个人额外权限"""
    role_perms = {p.value for p in ROLE_PERMISSIONS.get(Role(user.role), set())}
    extra_perms = {p.code for p in user.extra_permissions}
    return role_perms | extra_perms

5. 实战:权限管理 API

# routers/admin.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from dependencies import get_db, require_permission
from models.permission import Permission
from models.user import User

router = APIRouter(prefix="/admin", tags=["管理后台"])

@router.get("/users/{user_id}/role")
async def get_user_role(
    user_id: int,
    _: User = Depends(require_permission(Permission.USER_READ)),
):
    user = await db.get_user(user_id)
    return {"id": user.id, "role": user.role}

@router.put("/users/{user_id}/role")
async def update_user_role(
    user_id: int,
    new_role: str,
    current_user: User = Depends(require_permission(Permission.USER_UPDATE)),
    db: AsyncSession = Depends(get_db),
):
    # 防止越权:普通管理员不能把用户升为管理员
    if new_role == Role.ADMIN.value and current_user.role != Role.ADMIN:
        raise HTTPException(403, "无权授予管理员角色")

    await db.update_user_role(user_id, new_role)
    return {"message": "角色已更新"}

6. 小结

# RBAC 权限检查速查

# 1. 定义权限
class Permission(str, Enum):
    USER_DELETE = "user:delete"

# 2. 角色-权限映射
ROLE_PERMISSIONS = {Role.ADMIN: {Permission.USER_DELETE, ...}}

# 3. 权限检查依赖
def require_permission(permission: Permission):
    async def checker(current_user: User = Depends(get_current_user)):
        if permission not in user_permissions:
            raise HTTPException(403)
        return current_user
    return checker

# 4. 使用
@app.delete("/users/{id}")
async def delete_user(_: User = Depends(require_permission(Permission.USER_DELETE))):
    ...

# 5. 资源级检查
if article.author_id != current_user.id and current_user.role != Role.ADMIN:
    raise HTTPException(403, "无权操作")

💡 最佳实践:权限逻辑尽量放在依赖函数中,不要散落在各个路由里。这样修改权限逻辑只需改一处,且易于测试。


🔗 扩展阅读