FastAPI RBAC权限控制完全指南

📂 所属阶段:第四阶段 — 安全与认证(安全篇)
🔗 相关章节:FastAPI OAuth2与JWT鉴权 · FastAPI依赖注入系统

目录

RBAC核心概念

什么是RBAC(基于角色的访问控制)?

RBAC(Role-Based Access Control)是一种权限管理模型,它通过角色来控制用户对资源的访问权限。这种模型将权限分配给角色,再将角色分配给用户,从而实现更灵活、更易管理的权限系统。

用户 → 角色 → 权限

         资源 + 操作

例子:
Alice(用户)→ 管理员(角色)→ 删除用户、修改配置(权限)
Bob(用户)  → 编辑者(角色)→ 创建文章、编辑文章(权限)
Carol(用户)→ 普通用户(角色)→ 阅读文章、评论(权限)

RBAC三层模型

  1. 用户层(User Layer):用户与角色的分配关系
  2. 角色层(Role Layer):角色与权限的分配关系
  3. 权限层(Permission Layer):权限与资源操作的映射关系

RBAC的优势

  1. 简化管理:通过角色集中管理权限,减少用户-权限的直接关联
  2. 职责分离:不同角色承担不同职责,避免权限过度集中
  3. 灵活性:可以动态调整角色权限,适应业务变化
  4. 可扩展性:易于添加新的角色和权限
  5. 合规性:满足企业安全合规要求

权限模型设计

权限粒度设计原则

权限级别示例适用场景
功能级user:read, article:write控制API端点访问
数据级user:read:own, article:edit:department控制特定数据访问
字段级user:read:email, profile:edit:phone控制数据字段访问
操作级article:publish, user:delete:permanent控制特定操作

权限命名规范

# 推荐的权限命名格式:资源:操作[:范围]
PERMISSION_PATTERNS = {
    "basic": "resource:action",           # 基础格式
    "scoped": "resource:action:scope",    # 带范围格式
    "hierarchical": "module:resource:action",  # 层级格式
}

# 示例权限命名
EXAMPLE_PERMISSIONS = [
    "user:read",           # 读取用户
    "user:create",         # 创建用户
    "user:update",         # 更新用户
    "user:delete",         # 删除用户
    "user:read:own",       # 读取自己的用户信息
    "article:publish",     # 发布文章
    "admin:access",        # 访问管理后台
    "system:config",       # 系统配置
]

权限枚举与角色定义

完整权限枚举定义

# models/permission.py
from enum import Enum
from typing import Set, Dict, Optional
from dataclasses import dataclass

class Permission(str, Enum):
    """权限枚举 - 定义系统中的所有权限"""
    
    # 用户管理权限
    USER_READ = "user:read"
    USER_CREATE = "user:create"
    USER_UPDATE = "user:update"
    USER_DELETE = "user:delete"
    USER_MANAGE = "user:manage"  # 管理用户(包括其他操作)
    
    # 文章管理权限
    ARTICLE_READ = "article:read"
    ARTICLE_CREATE = "article:create"
    ARTICLE_UPDATE = "article:update"
    ARTICLE_DELETE = "article:delete"
    ARTICLE_PUBLISH = "article:publish"
    ARTICLE_MANAGE = "article:manage"
    
    # 评论管理权限
    COMMENT_READ = "comment:read"
    COMMENT_CREATE = "comment:create"
    COMMENT_UPDATE = "comment:update"
    COMMENT_DELETE = "comment:delete"
    
    # 系统管理权限
    ADMIN_ACCESS = "admin:access"
    SYSTEM_CONFIG = "system:config"
    SYSTEM_MONITOR = "system:monitor"
    SYSTEM_BACKUP = "system:backup"
    
    # 数据导出权限
    DATA_EXPORT = "data:export"
    DATA_IMPORT = "data:import"
    
    # 审计日志权限
    AUDIT_READ = "audit:read"
    AUDIT_MANAGE = "audit:manage"

@dataclass
class PermissionMetadata:
    """权限元数据 - 描述权限的详细信息"""
    name: str
    description: str
    category: str  # 如: "user", "article", "system"
    critical: bool = False  # 是否为关键权限

# 权限元数据映射
PERMISSION_METADATA: Dict[Permission, PermissionMetadata] = {
    Permission.USER_READ: PermissionMetadata(
        name="读取用户",
        description="允许查看用户信息",
        category="user",
        critical=False
    ),
    Permission.USER_CREATE: PermissionMetadata(
        name="创建用户",
        description="允许创建新用户",
        category="user",
        critical=True
    ),
    Permission.USER_DELETE: PermissionMetadata(
        name="删除用户",
        description="允许删除用户账号",
        category="user",
        critical=True
    ),
    Permission.ADMIN_ACCESS: PermissionMetadata(
        name="管理访问",
        description="允许访问管理后台",
        category="system",
        critical=True
    ),
    Permission.SYSTEM_CONFIG: PermissionMetadata(
        name="系统配置",
        description="允许修改系统配置",
        category="system",
        critical=True
    ),
}

class Role(str, Enum):
    """角色枚举 - 定义系统中的所有角色"""
    
    # 管理员角色
    SUPER_ADMIN = "super_admin"  # 超级管理员 - 拥有所有权限
    ADMIN = "admin"              # 管理员 - 拥有大部分权限
    
    # 内容管理角色
    EDITOR = "editor"            # 编辑 - 可编辑发布内容
    AUTHOR = "author"            # 作者 - 可创作内容
    
    # 普通用户角色
    MODERATOR = "moderator"      # 版主 - 管理社区内容
    USER = "user"                # 普通用户 - 基础功能
    GUEST = "guest"              # 访客 - 有限访问
}

@dataclass
class RoleMetadata:
    """角色元数据 - 描述角色的详细信息"""
    name: str
    description: str
    level: int  # 权限等级,数值越大权限越高
    inheritable: bool = True  # 是否可继承权限

# 角色元数据映射
ROLE_METADATA: Dict[Role, RoleMetadata] = {
    Role.SUPER_ADMIN: RoleMetadata(
        name="超级管理员",
        description="拥有系统最高权限,可执行所有操作",
        level=100,
        inheritable=True
    ),
    Role.ADMIN: RoleMetadata(
        name="管理员",
        description="拥有大部分管理权限",
        level=90,
        inheritable=True
    ),
    Role.EDITOR: RoleMetadata(
        name="编辑",
        description="可编辑和发布内容",
        level=70,
        inheritable=False
    ),
    Role.AUTHOR: RoleMetadata(
        name="作者",
        description="可创作和管理自己的内容",
        level=50,
        inheritable=False
    ),
    Role.MODERATOR: RoleMetadata(
        name="版主",
        description="可管理社区内容和用户行为",
        level=60,
        inheritable=False
    ),
    Role.USER: RoleMetadata(
        name="普通用户",
        description="可使用基础功能",
        level=30,
        inheritable=False
    ),
    Role.GUEST: RoleMetadata(
        name="访客",
        description="仅可查看公开内容",
        level=10,
        inheritable=False
    ),
}

权限验证工具函数

from typing import Union, List

def validate_permission_format(permission: str) -> bool:
    """验证权限字符串格式"""
    import re
    # 格式: resource:action 或 resource:action:scope
    pattern = r'^[a-z][a-z0-9_]*:[a-z][a-z0-9_]*(?::[a-z][a-z0-9_]*)?$'
    return bool(re.match(pattern, permission))

def normalize_permission(permission: Union[str, Permission]) -> str:
    """标准化权限表示"""
    if isinstance(permission, Permission):
        return permission.value
    return permission.lower()

def parse_permission(permission: str) -> tuple[str, str, Optional[str]]:
    """解析权限字符串为组件"""
    parts = permission.split(':')
    if len(parts) < 2:
        raise ValueError(f"无效的权限格式: {permission}")
    
    resource = parts[0]
    action = parts[1]
    scope = parts[2] if len(parts) > 2 else None
    
    return resource, action, scope

角色权限映射系统

静态角色权限映射

# 静态角色权限映射配置
ROLE_PERMISSIONS: Dict[Role, Set[Permission]] = {
    Role.SUPER_ADMIN: {
        # 拥有所有权限
        *list(Permission.__members__.values())
    },
    Role.ADMIN: {
        # 管理员权限
        Permission.USER_READ,
        Permission.USER_CREATE,
        Permission.USER_UPDATE,
        Permission.USER_DELETE,
        Permission.ARTICLE_READ,
        Permission.ARTICLE_CREATE,
        Permission.ARTICLE_UPDATE,
        Permission.ARTICLE_DELETE,
        Permission.ARTICLE_PUBLISH,
        Permission.COMMENT_READ,
        Permission.COMMENT_DELETE,
        Permission.ADMIN_ACCESS,
        Permission.SYSTEM_CONFIG,
        Permission.AUDIT_READ,
    },
    Role.EDITOR: {
        # 编辑权限
        Permission.ARTICLE_READ,
        Permission.ARTICLE_CREATE,
        Permission.ARTICLE_UPDATE,
        Permission.ARTICLE_DELETE,
        Permission.ARTICLE_PUBLISH,
        Permission.COMMENT_READ,
        Permission.USER_READ,
    },
    Role.AUTHOR: {
        # 作者权限
        Permission.ARTICLE_READ,
        Permission.ARTICLE_CREATE,
        Permission.ARTICLE_UPDATE,  # 仅自己的文章
        Permission.ARTICLE_DELETE,  # 仅自己的文章
        Permission.COMMENT_READ,
    },
    Role.MODERATOR: {
        # 版主权限
        Permission.COMMENT_READ,
        Permission.COMMENT_UPDATE,
        Permission.COMMENT_DELETE,
        Permission.ARTICLE_READ,
        Permission.AUDIT_READ,
    },
    Role.USER: {
        # 普通用户权限
        Permission.ARTICLE_READ,
        Permission.COMMENT_CREATE,
    },
    Role.GUEST: {
        # 访客权限
        Permission.ARTICLE_READ,
    },
}

def get_role_permissions(role: Role) -> Set[Permission]:
    """获取角色的权限集合"""
    return ROLE_PERMISSIONS.get(role, set())

def has_permission(role: Role, permission: Permission) -> bool:
    """检查角色是否拥有指定权限"""
    return permission in get_role_permissions(role)

def get_permission_hierarchy() -> Dict[str, Set[Permission]]:
    """获取权限层级映射"""
    hierarchy = {}
    for role, permissions in ROLE_PERMISSIONS.items():
        role_level = ROLE_METADATA[role].level
        for perm in permissions:
            if perm not in hierarchy:
                hierarchy[perm] = set()
            hierarchy[perm].add(role)
    return hierarchy

动态权限系统

from typing import Optional, Dict, Any
from datetime import datetime
import asyncio
from dataclasses import dataclass

@dataclass
class DynamicPermission:
    """动态权限 - 带有条件和有效期的权限"""
    permission: Permission
    condition: Optional[str] = None  # 条件表达式
    valid_from: Optional[datetime] = None
    valid_until: Optional[datetime] = None
    metadata: Optional[Dict[str, Any]] = None

class DynamicPermissionManager:
    """动态权限管理器"""
    
    def __init__(self):
        self.dynamic_permissions: Dict[int, List[DynamicPermission]] = {}  # user_id -> permissions
        self.temp_permissions: Dict[str, DynamicPermission] = {}  # token -> permission
    
    async def grant_temp_permission(
        self, 
        user_id: int, 
        permission: Permission, 
        duration_minutes: int = 5
    ) -> str:
        """授予临时权限"""
        import uuid
        from datetime import timedelta
        
        temp_token = str(uuid.uuid4())
        expiry_time = datetime.utcnow() + timedelta(minutes=duration_minutes)
        
        temp_perm = DynamicPermission(
            permission=permission,
            valid_until=expiry_time
        )
        
        self.temp_permissions[temp_token] = temp_perm
        return temp_token
    
    async def check_dynamic_permission(
        self, 
        user_id: int, 
        permission: Permission, 
        context: Optional[Dict[str, Any]] = None
    ) -> bool:
        """检查动态权限"""
        now = datetime.utcnow()
        
        # 检查用户特定的动态权限
        user_perms = self.dynamic_permissions.get(user_id, [])
        for dyn_perm in user_perms:
            if (dyn_perm.permission == permission and 
                self._is_permission_valid(dyn_perm, now) and
                self._check_condition(dyn_perm, context)):
                return True
        
        # 检查临时权限
        for temp_token, temp_perm in self.temp_permissions.items():
            if (temp_perm.permission == permission and 
                self._is_permission_valid(temp_perm, now)):
                # 临时权限使用后立即删除
                del self.temp_permissions[temp_token]
                return True
        
        return False
    
    def _is_permission_valid(self, perm: DynamicPermission, now: datetime) -> bool:
        """检查权限是否在有效期内"""
        if perm.valid_from and now < perm.valid_from:
            return False
        if perm.valid_until and now > perm.valid_until:
            return False
        return True
    
    def _check_condition(self, perm: DynamicPermission, context: Optional[Dict]) -> bool:
        """检查权限条件"""
        if not perm.condition or not context:
            return True
        
        # 简单的条件检查(实际项目中应使用安全的表达式引擎)
        try:
            # 这里只是一个示例,实际应用中需要使用安全的条件评估
            return eval(perm.condition, {"context": context, "__builtins__": {}})
        except:
            return False

# 全局动态权限管理器实例
dynamic_perm_manager = DynamicPermissionManager()

权限检查依赖注入

基础权限检查依赖

# dependencies/auth.py
from fastapi import Depends, HTTPException, status
from typing import Union, List, Optional
from functools import wraps

from models.permission import (
    Permission, Role, ROLE_PERMISSIONS, 
    get_role_permissions, has_permission,
    dynamic_perm_manager
)
from models.user import User
from auth.jwt import get_current_user

def require_permission(permission: Permission):
    """检查用户是否拥有指定权限"""
    async def checker(current_user: User = Depends(get_current_user)) -> User:
        if not current_user.is_active:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="用户账户已被禁用"
            )
        
        # 检查静态权限
        user_role = Role(current_user.role)
        user_permissions = get_role_permissions(user_role)
        
        if permission in user_permissions:
            return current_user
        
        # 检查动态权限
        has_dyn_perm = await dynamic_perm_manager.check_dynamic_permission(
            current_user.id, permission
        )
        if has_dyn_perm:
            return current_user
        
        # 检查用户特定权限(数据库中存储的额外权限)
        user_specific_perms = await get_user_specific_permissions(current_user.id)
        if permission.value in user_specific_perms:
            return current_user
        
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=f"缺少权限: {permission.value}",
        )
    
    return checker

def require_any_permission(*permissions: Permission):
    """拥有任一权限即可通过"""
    async def checker(current_user: User = Depends(get_current_user)) -> User:
        if not current_user.is_active:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="用户账户已被禁用"
            )
        
        user_role = Role(current_user.role)
        user_permissions = get_role_permissions(user_role)
        
        # 检查是否有任一权限
        for perm in permissions:
            if perm in user_permissions:
                return current_user
        
        # 检查动态权限
        for perm in permissions:
            has_dyn_perm = await dynamic_perm_manager.check_dynamic_permission(
                current_user.id, perm
            )
            if has_dyn_perm:
                return current_user
        
        # 检查用户特定权限
        user_specific_perms = await get_user_specific_permissions(current_user.id)
        for perm in permissions:
            if perm.value in user_specific_perms:
                return current_user
        
        required_perms = [p.value for p in permissions]
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=f"需要以下任一权限: {required_perms}"
        )
    
    return checker

def require_all_permissions(*permissions: Permission):
    """必须拥有所有权限才通过"""
    async def checker(current_user: User = Depends(get_current_user)) -> User:
        if not current_user.is_active:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="用户账户已被禁用"
            )
        
        user_role = Role(current_user.role)
        user_permissions = get_role_permissions(user_role)
        
        # 检查是否拥有所有权限
        missing_perms = []
        for perm in permissions:
            if perm not in user_permissions:
                # 检查动态权限
                has_dyn_perm = await dynamic_perm_manager.check_dynamic_permission(
                    current_user.id, perm
                )
                if not has_dyn_perm:
                    # 检查用户特定权限
                    user_specific_perms = await get_user_specific_permissions(current_user.id)
                    if perm.value not in user_specific_perms:
                        missing_perms.append(perm.value)
        
        if missing_perms:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"缺少以下权限: {missing_perms}"
            )
        
        return current_user
    
    return checker

def require_role(*roles: Role):
    """检查用户角色"""
    async def checker(current_user: User = Depends(get_current_user)) -> User:
        if not current_user.is_active:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="用户账户已被禁用"
            )
        
        user_role = Role(current_user.role)
        if user_role not in roles:
            allowed_roles = [r.value for r in roles]
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"需要以下角色之一: {allowed_roles}, 当前角色: {user_role.value}"
            )
        
        return current_user
    
    return checker

def require_highest_role():
    """检查用户是否拥有最高权限角色"""
    async def checker(current_user: User = Depends(get_current_user)) -> User:
        if not current_user.is_active:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="用户账户已被禁用"
            )
        
        user_role = Role(current_user.role)
        highest_level = max(ROLE_METADATA[r].level for r in Role)
        user_level = ROLE_METADATA[user_role].level
        
        if user_level != highest_level:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"需要最高权限角色"
            )
        
        return current_user
    
    return checker

高级权限检查依赖

from typing import Callable, Any
from pydantic import BaseModel

class PermissionContext(BaseModel):
    """权限检查上下文"""
    resource_id: Optional[int] = None
    resource_owner_id: Optional[int] = None
    action: Optional[str] = None
    additional_data: Optional[dict] = None

def require_resource_permission(
    permission: Permission,
    owner_field: str = "owner_id",
    resource_param: str = "resource_id"
):
    """资源级权限检查 - 检查用户是否有权访问特定资源"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 获取当前用户
            current_user = kwargs.get('current_user') or next(
                (arg for arg in args if isinstance(arg, User)), None
            )
            
            if not current_user or not current_user.is_active:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="用户未认证或账户已被禁用"
                )
            
            # 获取资源ID
            resource_id = kwargs.get(resource_param)
            if not resource_id:
                # 尝试从路径参数获取
                import inspect
                sig = inspect.signature(func)
                for param_name, param in sig.parameters.items():
                    if param_name.endswith('_id') and param_name != 'current_user':
                        resource_id = kwargs.get(param_name)
                        break
            
            # 获取资源(这里假设有一个获取资源的函数)
            resource = await get_resource_by_id(resource_id)
            if not resource:
                raise HTTPException(
                    status_code=status.HTTP_404_NOT_FOUND,
                    detail="资源不存在"
                )
            
            # 检查权限
            user_role = Role(current_user.role)
            user_permissions = get_role_permissions(user_role)
            
            # 超级管理员有所有权限
            if user_role == Role.SUPER_ADMIN:
                return await func(*args, **kwargs)
            
            # 检查基础权限
            if permission not in user_permissions:
                # 检查动态权限
                has_dyn_perm = await dynamic_perm_manager.check_dynamic_permission(
                    current_user.id, permission, 
                    context={"resource_id": resource_id, "action": "access"}
                )
                if not has_dyn_perm:
                    raise HTTPException(
                        status_code=status.HTTP_403_FORBIDDEN,
                        detail=f"无权访问此资源: {permission.value}"
                    )
            
            # 检查资源所有权(对于需要所有权的权限)
            if permission in [Permission.ARTICLE_UPDATE, Permission.ARTICLE_DELETE]:
                resource_owner_id = getattr(resource, owner_field, None)
                if resource_owner_id != current_user.id:
                    # 检查是否有管理权限
                    if Permission.ARTICLE_MANAGE not in user_permissions:
                        raise HTTPException(
                            status_code=status.HTTP_403_FORBIDDEN,
                            detail="无权操作他人资源"
                        )
            
            return await func(*args, **kwargs)
        return wrapper
    return decorator

def permission_required_with_context(permission: Permission, context_func: Callable):
    """带上下文的权限检查"""
    async def checker(current_user: User = Depends(get_current_user)) -> User:
        if not current_user.is_active:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="用户账户已被禁用"
            )
        
        # 获取权限检查上下文
        context = await context_func(current_user)
        
        user_role = Role(current_user.role)
        user_permissions = get_role_permissions(user_role)
        
        # 基础权限检查
        if permission in user_permissions:
            return current_user
        
        # 动态权限检查
        has_dyn_perm = await dynamic_perm_manager.check_dynamic_permission(
            current_user.id, permission, context=context
        )
        if has_dyn_perm:
            return current_user
        
        # 用户特定权限检查
        user_specific_perms = await get_user_specific_permissions(current_user.id)
        if permission.value in user_specific_perms:
            return current_user
        
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=f"缺少权限: {permission.value}"
        )
    
    return checker

细粒度权限控制

基于资源所有权的权限控制

async def require_resource_ownership(
    resource_owner_id: int, 
    required_permission: Permission,
    current_user: User = Depends(get_current_user)
) -> User:
    """检查资源所有权或管理权限"""
    if not current_user.is_active:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户账户已被禁用"
        )
    
    user_role = Role(current_user.role)
    user_permissions = get_role_permissions(user_role)
    
    # 超级管理员有所有权限
    if user_role == Role.SUPER_ADMIN:
        return current_user
    
    # 用户拥有资源或有管理权限
    if current_user.id == resource_owner_id:
        # 检查用户对自己资源的基本权限
        basic_perms = {
            Permission.ARTICLE_UPDATE, Permission.ARTICLE_DELETE,
            Permission.USER_UPDATE, Permission.USER_DELETE
        }
        if required_permission in basic_perms:
            return current_user
    
    # 检查管理权限
    management_perms = {
        Permission.USER_MANAGE, Permission.ARTICLE_MANAGE,
        Permission.COMMENT_MANAGE
    }
    for manage_perm in management_perms:
        if manage_perm in user_permissions:
            # 检查具体权限
            if required_permission in user_permissions:
                return current_user
    
    raise HTTPException(
        status_code=status.HTTP_403_FORBIDDEN,
        detail="无权操作此资源"
    )

# 使用示例
@app.delete("/articles/{article_id}")
async def delete_article(
    article_id: int,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db)
):
    article = await db.get(Article, article_id)
    if not article:
        raise HTTPException(status_code=404, detail="文章不存在")
    
    # 检查资源所有权或管理权限
    await require_resource_ownership(article.author_id, Permission.ARTICLE_DELETE, current_user)
    
    await db.delete(article)
    await db.commit()
    return {"message": "文章已删除"}

基于部门/团队的权限控制

class DepartmentPermissionChecker:
    """部门权限检查器"""
    
    def __init__(self, db: AsyncSession):
        self.db = db
    
    async def user_has_department_access(
        self, 
        user_id: int, 
        department_id: int, 
        permission: Permission
    ) -> bool:
        """检查用户是否有部门访问权限"""
        # 获取用户部门关系
        stmt = select(UserDepartment).where(
            UserDepartment.user_id == user_id,
            UserDepartment.department_id == department_id
        )
        user_dept = await self.db.execute(stmt)
        user_dept_rel = user_dept.scalar()
        
        if not user_dept_rel:
            return False
        
        # 检查部门级别权限
        if user_dept_rel.role == "admin":
            # 部门管理员有所有权限
            return True
        elif user_dept_rel.role == "member":
            # 普通成员只有基本权限
            basic_perms = {
                Permission.ARTICLE_READ, Permission.ARTICLE_CREATE,
                Permission.COMMENT_READ, Permission.COMMENT_CREATE
            }
            return permission in basic_perms
        
        return False

def require_department_permission(
    department_id_param: str = "department_id",
    permission: Permission = Permission.ARTICLE_READ
):
    """部门权限检查依赖"""
    async def checker(
        current_user: User = Depends(get_current_user),
        db: AsyncSession = Depends(get_db),
        department_id: int = None  # 通过路径参数或其他方式传入
    ) -> User:
        if not current_user.is_active:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="用户账户已被禁用"
            )
        
        # 如果department_id未通过参数传入,尝试从请求上下文获取
        if department_id is None:
            # 这里可以从请求路径、查询参数等获取
            pass
        
        checker = DepartmentPermissionChecker(db)
        has_access = await checker.user_has_department_access(
            current_user.id, department_id, permission
        )
        
        if not has_access:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"无权访问部门资源: {permission.value}"
            )
        
        return current_user
    
    return checker

时间窗口权限控制

class TimeBasedPermissionChecker:
    """基于时间的权限检查器"""
    
    def __init__(self):
        self.work_hours_start = 9  # 9 AM
        self.work_hours_end = 18   # 6 PM
    
    def is_work_hours(self) -> bool:
        """检查是否在工作时间内"""
        from datetime import datetime
        now = datetime.now()
        return self.work_hours_start <= now.hour < self.work_hours_end
    
    def is_weekday(self) -> bool:
        """检查是否为工作日"""
        from datetime import datetime
        now = datetime.now()
        return now.weekday() < 5  # Monday to Friday
    
    async def check_time_based_permission(
        self, 
        user_id: int, 
        permission: Permission
    ) -> bool:
        """检查基于时间的权限"""
        # 某些敏感权限只能在工作时间执行
        sensitive_perms = {
            Permission.USER_DELETE, Permission.SYSTEM_CONFIG,
            Permission.DATA_EXPORT, Permission.SYSTEM_BACKUP
        }
        
        if permission in sensitive_perms:
            return self.is_work_hours() and self.is_weekday()
        
        return True

time_perm_checker = TimeBasedPermissionChecker()

def require_time_based_permission(permission: Permission):
    """时间基权限检查依赖"""
    async def checker(current_user: User = Depends(get_current_user)) -> User:
        if not current_user.is_active:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="用户账户已被禁用"
            )
        
        # 检查时间限制
        if not await time_perm_checker.check_time_based_permission(
            current_user.id, permission
        ):
            from datetime import datetime
            now = datetime.now()
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"权限 {permission.value} 只能在工作时间(周一至周五 9:00-18:00)使用。当前时间: {now.strftime('%Y-%m-%d %H:%M:%S')}"
            )
        
        # 执行正常的权限检查
        user_role = Role(current_user.role)
        user_permissions = get_role_permissions(user_role)
        
        if permission in user_permissions:
            return current_user
        
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=f"缺少权限: {permission.value}"
        )
    
    return checker

数据库级权限存储

权限相关数据库模型

# models/permission_models.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text, ForeignKey
from sqlalchemy.orm import relationship, Mapped, mapped_column
from sqlalchemy.sql import func
from typing import List
from datetime import datetime

# 用户-角色关联表
user_roles = Table(
    "user_roles",
    Base.metadata,
    Column("user_id", Integer, ForeignKey("users.id"), primary_key=True),
    Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
)

# 角色-权限关联表
role_permissions = Table(
    "role_permissions",
    Base.metadata,
    Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
    Column("permission_id", Integer, ForeignKey("permissions.id"), primary_key=True),
)

# 用户-权限关联表(用于用户特定权限)
user_permissions = Table(
    "user_permissions",
    Base.metadata,
    Column("user_id", Integer, ForeignKey("users.id"), primary_key=True),
    Column("permission_id", Integer, ForeignKey("permissions.id"), primary_key=True),
)

class RoleModel(Base):
    """角色模型 - 数据库存储的角色信息"""
    __tablename__ = "roles"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True, index=True)
    description: Mapped[str] = mapped_column(Text, nullable=True)
    level: Mapped[int] = mapped_column(default=0)  # 权限等级
    is_active: Mapped[bool] = mapped_column(default=True)
    created_at: Mapped[datetime] = mapped_column(default=func.now())
    updated_at: Mapped[datetime] = mapped_column(default=func.now(), onupdate=func.now())
    
    # 关系
    permissions: Mapped[List["PermissionModel"]] = relationship(
        "PermissionModel",
        secondary=role_permissions,
        back_populates="roles"
    )
    users: Mapped[List["User"]] = relationship(
        "User",
        secondary=user_roles,
        back_populates="roles"
    )

class PermissionModel(Base):
    """权限模型 - 数据库存储的权限信息"""
    __tablename__ = "permissions"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    code: Mapped[str] = mapped_column(String(100), unique=True, index=True)  # 如: "user:read"
    name: Mapped[str] = mapped_column(String(100))  # 权限名称
    description: Mapped[str] = mapped_column(Text, nullable=True)  # 权限描述
    category: Mapped[str] = mapped_column(String(50), default="general")  # 权限类别
    is_active: Mapped[bool] = mapped_column(default=True)
    critical: Mapped[bool] = mapped_column(default=False)  # 是否为关键权限
    created_at: Mapped[datetime] = mapped_column(default=func.now())
    updated_at: Mapped[datetime] = mapped_column(default=func.now(), onupdate=func.now())
    
    # 关系
    roles: Mapped[List[RoleModel]] = relationship(
        "RoleModel", 
        secondary=role_permissions, 
        back_populates="permissions"
    )
    users: Mapped[List["User"]] = relationship(
        "User",
        secondary=user_permissions,
        back_populates="specific_permissions"
    )

class UserRole(Base):
    """用户角色关系 - 记录用户的角色分配"""
    __tablename__ = "user_roles"
    
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), primary_key=True)
    role_id: Mapped[int] = mapped_column(ForeignKey("roles.id"), primary_key=True)
    assigned_by: Mapped[int] = mapped_column(ForeignKey("users.id"))  # 分配人
    assigned_at: Mapped[datetime] = mapped_column(default=func.now())
    expires_at: Mapped[datetime] = mapped_column(nullable=True)  # 过期时间
    reason: Mapped[str] = mapped_column(Text, nullable=True)  # 分配原因

class UserPermission(Base):
    """用户特定权限 - 为用户单独分配的权限"""
    __tablename__ = "user_permissions"
    
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), primary_key=True)
    permission_id: Mapped[int] = mapped_column(ForeignKey("permissions.id"), primary_key=True)
    granted_by: Mapped[int] = mapped_column(ForeignKey("users.id"))  # 授予权限的人
    granted_at: Mapped[datetime] = mapped_column(default=func.now())
    expires_at: Mapped[datetime] = mapped_column(nullable=True)  # 过期时间
    reason: Mapped[str] = mapped_column(Text, nullable=True)  # 授予原因

权限服务层

# services/permission_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, insert, delete
from typing import List, Set, Optional
from datetime import datetime

class PermissionService:
    """权限服务 - 处理权限相关的业务逻辑"""
    
    def __init__(self, db: AsyncSession):
        self.db = db
    
    async def get_user_permissions(self, user_id: int) -> Set[str]:
        """获取用户的所有权限(角色权限 + 个人权限)"""
        permissions = set()
        
        # 获取用户角色
        user_roles_stmt = select(UserRole).where(UserRole.user_id == user_id)
        user_roles_result = await self.db.execute(user_roles_stmt)
        user_roles = user_roles_result.scalars().all()
        
        # 获取角色权限
        for user_role in user_roles:
            role_perms_stmt = select(PermissionModel).join(
                role_permissions
            ).where(role_permissions.c.role_id == user_role.role_id)
            role_perms_result = await self.db.execute(role_perms_stmt)
            role_perms = role_perms_result.scalars().all()
            
            for perm in role_perms:
                permissions.add(perm.code)
        
        # 获取用户特定权限
        user_perms_stmt = select(PermissionModel).join(
            user_permissions
        ).where(user_permissions.c.user_id == user_id)
        user_perms_result = await self.db.execute(user_perms_stmt)
        user_perms = user_perms_result.scalars().all()
        
        for perm in user_perms:
            permissions.add(perm.code)
        
        return permissions
    
    async def user_has_permission(self, user_id: int, permission_code: str) -> bool:
        """检查用户是否有特定权限"""
        user_perms = await self.get_user_permissions(user_id)
        return permission_code in user_perms
    
    async def assign_role_to_user(
        self, 
        user_id: int, 
        role_id: int, 
        assigned_by: int, 
        reason: str = None,
        expires_at: datetime = None
    ) -> bool:
        """为用户分配角色"""
        try:
            stmt = insert(UserRole).values(
                user_id=user_id,
                role_id=role_id,
                assigned_by=assigned_by,
                reason=reason,
                expires_at=expires_at
            )
            await self.db.execute(stmt)
            await self.db.commit()
            return True
        except Exception:
            await self.db.rollback()
            return False
    
    async def remove_role_from_user(self, user_id: int, role_id: int) -> bool:
        """从用户移除角色"""
        try:
            stmt = delete(UserRole).where(
                UserRole.user_id == user_id,
                UserRole.role_id == role_id
            )
            await self.db.execute(stmt)
            await self.db.commit()
            return True
        except Exception:
            await self.db.rollback()
            return False
    
    async def grant_permission_to_user(
        self, 
        user_id: int, 
        permission_id: int, 
        granted_by: int, 
        reason: str = None,
        expires_at: datetime = None
    ) -> bool:
        """为用户授予特定权限"""
        try:
            stmt = insert(UserPermission).values(
                user_id=user_id,
                permission_id=permission_id,
                granted_by=granted_by,
                reason=reason,
                expires_at=expires_at
            )
            await self.db.execute(stmt)
            await self.db.commit()
            return True
        except Exception:
            await self.db.rollback()
            return False
    
    async def revoke_permission_from_user(self, user_id: int, permission_id: int) -> bool:
        """从用户撤销特定权限"""
        try:
            stmt = delete(UserPermission).where(
                UserPermission.user_id == user_id,
                UserPermission.permission_id == permission_id
            )
            await self.db.execute(stmt)
            await self.db.commit()
            return True
        except Exception:
            await self.db.rollback()
            return False
    
    async def get_role_permissions(self, role_id: int) -> List[str]:
        """获取角色的权限列表"""
        stmt = select(PermissionModel.code).join(
            role_permissions
        ).where(role_permissions.c.role_id == role_id)
        result = await self.db.execute(stmt)
        permissions = [row[0] for row in result.all()]
        return permissions
    
    async def create_role(
        self, 
        name: str, 
        description: str, 
        level: int = 0
    ) -> Optional[RoleModel]:
        """创建新角色"""
        try:
            role = RoleModel(
                name=name,
                description=description,
                level=level
            )
            self.db.add(role)
            await self.db.commit()
            await self.db.refresh(role)
            return role
        except Exception:
            await self.db.rollback()
            return None
    
    async def create_permission(
        self, 
        code: str, 
        name: str, 
        description: str, 
        category: str = "general"
    ) -> Optional[PermissionModel]:
        """创建新权限"""
        try:
            permission = PermissionModel(
                code=code,
                name=name,
                description=description,
                category=category
            )
            self.db.add(permission)
            await self.db.commit()
            await self.db.refresh(permission)
            return permission
        except Exception:
            await self.db.rollback()
            return None

# 权限服务实例
def get_permission_service(db: AsyncSession = Depends(get_db)) -> PermissionService:
    return PermissionService(db)

高级权限管理API

权限管理路由

# routers/permissions.py
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional
from datetime import datetime

from services.permission_service import PermissionService, get_permission_service
from models.permission import Permission, Role, require_permission
from models.user import User

router = APIRouter(prefix="/permissions", tags=["权限管理"])

@router.get("/user/{user_id}/permissions", response_model=List[str])
async def get_user_permissions(
    user_id: int,
    current_user: User = Depends(require_permission(Permission.USER_READ)),
    perm_service: PermissionService = Depends(get_permission_service)
):
    """获取用户的权限列表"""
    permissions = await perm_service.get_user_permissions(user_id)
    return list(permissions)

@router.get("/user/{user_id}/has-permission/{permission_code}")
async def check_user_permission(
    user_id: int,
    permission_code: str,
    current_user: User = Depends(require_permission(Permission.USER_READ)),
    perm_service: PermissionService = Depends(get_permission_service)
):
    """检查用户是否有特定权限"""
    has_perm = await perm_service.user_has_permission(user_id, permission_code)
    return {"user_id": user_id, "permission": permission_code, "has_permission": has_perm}

@router.post("/user/{user_id}/roles/{role_id}")
async def assign_role_to_user(
    user_id: int,
    role_id: int,
    reason: str = Query(None, description="分配原因"),
    expires_at: str = Query(None, description="过期时间 (YYYY-MM-DDTHH:MM:SS)"),
    current_user: User = Depends(require_permission(Permission.USER_MANAGE)),
    perm_service: PermissionService = Depends(get_permission_service)
):
    """为用户分配角色"""
    expires_datetime = None
    if expires_at:
        try:
            expires_datetime = datetime.fromisoformat(expires_at.replace('Z', '+00:00'))
        except ValueError:
            raise HTTPException(status_code=400, detail="无效的时间格式")
    
    success = await perm_service.assign_role_to_user(
        user_id, role_id, current_user.id, reason, expires_datetime
    )
    
    if not success:
        raise HTTPException(status_code=400, detail="角色分配失败")
    
    return {"message": "角色分配成功", "user_id": user_id, "role_id": role_id}

@router.delete("/user/{user_id}/roles/{role_id}")
async def remove_role_from_user(
    user_id: int,
    role_id: int,
    current_user: User = Depends(require_permission(Permission.USER_MANAGE)),
    perm_service: PermissionService = Depends(get_permission_service)
):
    """从用户移除角色"""
    success = await perm_service.remove_role_from_user(user_id, role_id)
    
    if not success:
        raise HTTPException(status_code=400, detail="角色移除失败")
    
    return {"message": "角色移除成功", "user_id": user_id, "role_id": role_id}

@router.post("/user/{user_id}/permissions/{permission_id}")
async def grant_user_permission(
    user_id: int,
    permission_id: int,
    reason: str = Query(None, description="授予权限原因"),
    expires_at: str = Query(None, description="过期时间 (YYYY-MM-DDTHH:MM:SS)"),
    current_user: User = Depends(require_permission(Permission.USER_MANAGE)),
    perm_service: PermissionService = Depends(get_permission_service)
):
    """为用户授予特定权限"""
    expires_datetime = None
    if expires_at:
        try:
            expires_datetime = datetime.fromisoformat(expires_at.replace('Z', '+00:00'))
        except ValueError:
            raise HTTPException(status_code=400, detail="无效的时间格式")
    
    success = await perm_service.grant_permission_to_user(
        user_id, permission_id, current_user.id, reason, expires_datetime
    )
    
    if not success:
        raise HTTPException(status_code=400, detail="权限授予失败")
    
    return {"message": "权限授予成功", "user_id": user_id, "permission_id": permission_id}

@router.delete("/user/{user_id}/permissions/{permission_id}")
async def revoke_user_permission(
    user_id: int,
    permission_id: int,
    current_user: User = Depends(require_permission(Permission.USER_MANAGE)),
    perm_service: PermissionService = Depends(get_permission_service)
):
    """从用户撤销特定权限"""
    success = await perm_service.revoke_permission_from_user(user_id, permission_id)
    
    if not success:
        raise HTTPException(status_code=400, detail="权限撤销失败")
    
    return {"message": "权限撤销成功", "user_id": user_id, "permission_id": permission_id}

@router.get("/roles/{role_id}/permissions", response_model=List[str])
async def get_role_permissions(
    role_id: int,
    current_user: User = Depends(require_permission(Permission.AUDIT_READ)),
    perm_service: PermissionService = Depends(get_permission_service)
):
    """获取角色的权限列表"""
    permissions = await perm_service.get_role_permissions(role_id)
    return permissions

@router.post("/roles/", response_model=dict)
async def create_new_role(
    name: str = Query(..., description="角色名称"),
    description: str = Query(..., description="角色描述"),
    level: int = Query(0, description="权限等级"),
    current_user: User = Depends(require_permission(Permission.SYSTEM_CONFIG)),
    perm_service: PermissionService = Depends(get_permission_service)
):
    """创建新角色"""
    role = await perm_service.create_role(name, description, level)
    if not role:
        raise HTTPException(status_code=400, detail="角色创建失败")
    
    return {"message": "角色创建成功", "role_id": role.id, "role_name": role.name}

@router.post("/permissions/", response_model=dict)
async def create_new_permission(
    code: str = Query(..., description="权限代码,如: user:read"),
    name: str = Query(..., description="权限名称"),
    description: str = Query(..., description="权限描述"),
    category: str = Query("general", description="权限类别"),
    current_user: User = Depends(require_permission(Permission.SYSTEM_CONFIG)),
    perm_service: PermissionService = Depends(get_permission_service)
):
    """创建新权限"""
    # 验证权限代码格式
    if not validate_permission_format(code):
        raise HTTPException(status_code=400, detail="无效的权限代码格式")
    
    permission = await perm_service.create_permission(code, name, description, category)
    if not permission:
        raise HTTPException(status_code=400, detail="权限创建失败")
    
    return {
        "message": "权限创建成功", 
        "permission_id": permission.id, 
        "permission_code": permission.code
    }

权限审计API

# routers/permission_audit.py
from fastapi import APIRouter, Depends, Query
from typing import List
from datetime import datetime, timedelta

router = APIRouter(prefix="/permissions/audit", tags=["权限审计"])

class PermissionAuditLog(Base):
    """权限变更审计日志"""
    __tablename__ = "permission_audit_logs"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    action: Mapped[str] = mapped_column(String(50))  # "assign_role", "grant_permission", etc.
    target_user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))  # 被操作的用户
    role_id: Mapped[Optional[int]] = mapped_column(ForeignKey("roles.id"), nullable=True)
    permission_id: Mapped[Optional[int]] = mapped_column(ForeignKey("permissions.id"), nullable=True)
    reason: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    performed_by: Mapped[int] = mapped_column(ForeignKey("users.id"))  # 执行操作的用户
    timestamp: Mapped[datetime] = mapped_column(default=func.now())
    ip_address: Mapped[Optional[str]] = mapped_column(String(45), nullable=True)

@router.get("/logs")
async def get_permission_audit_logs(
    current_user: User = Depends(require_permission(Permission.AUDIT_READ)),
    limit: int = Query(100, le=1000),
    offset: int = Query(0),
    start_date: str = Query(None),
    end_date: str = Query(None),
    action: str = Query(None),
    user_id: int = Query(None),
    db: AsyncSession = Depends(get_db)
):
    """获取权限变更审计日志"""
    query = select(PermissionAuditLog).order_by(PermissionAuditLog.timestamp.desc())
    
    if start_date:
        start_dt = datetime.fromisoformat(start_date)
        query = query.where(PermissionAuditLog.timestamp >= start_dt)
    
    if end_date:
        end_dt = datetime.fromisoformat(end_date)
        query = query.where(PermissionAuditLog.timestamp <= end_dt)
    
    if action:
        query = query.where(PermissionAuditLog.action == action)
    
    if user_id:
        query = query.where(PermissionAuditLog.target_user_id == user_id)
    
    query = query.limit(limit).offset(offset)
    
    result = await db.execute(query)
    logs = result.scalars().all()
    
    return {
        "logs": [
            {
                "id": log.id,
                "user_id": log.target_user_id,
                "action": log.action,
                "performed_by": log.performed_by,
                "reason": log.reason,
                "timestamp": log.timestamp.isoformat(),
                "ip_address": log.ip_address
            }
            for log in logs
        ],
        "total": len(logs)
    }

权限缓存与性能优化

权限缓存系统

import aioredis
from typing import Optional, Set
import json
from datetime import timedelta

class PermissionCache:
    """权限缓存系统"""
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_url = redis_url
        self.redis = None
    
    async def init_redis(self):
        """初始化Redis连接"""
        self.redis = await aioredis.from_url(self.redis_url)
    
    async def get_user_permissions_cached(self, user_id: int) -> Optional[Set[str]]:
        """从缓存获取用户权限"""
        if not self.redis:
            return None
        
        cache_key = f"user_permissions:{user_id}"
        cached = await self.redis.get(cache_key)
        if cached:
            return set(json.loads(cached))
        return None
    
    async def set_user_permissions_cache(self, user_id: int, permissions: Set[str]):
        """设置用户权限缓存(缓存24小时)"""
        if not self.redis:
            return
        
        cache_key = f"user_permissions:{user_id}"
        await self.redis.setex(
            cache_key, 
            timedelta(hours=24), 
            json.dumps(list(permissions))
        )
    
    async def invalidate_user_permissions_cache(self, user_id: int):
        """清除用户权限缓存"""
        if not self.redis:
            return
        
        cache_key = f"user_permissions:{user_id}"
        await self.redis.delete(cache_key)
    
    async def invalidate_role_permissions_cache(self, role_id: int):
        """清除角色权限缓存"""
        if not self.redis:
            return
        
        # 找到所有拥有此角色的用户并清除其缓存
        # 这里需要查询数据库获取用户列表
        pass

# 全局权限缓存实例
permission_cache = PermissionCache()

# 修改权限服务以使用缓存
class CachedPermissionService(PermissionService):
    """带缓存的权限服务"""
    
    async def get_user_permissions(self, user_id: int) -> Set[str]:
        # 先从缓存获取
        cached_perms = await permission_cache.get_user_permissions_cached(user_id)
        if cached_perms is not None:
            return cached_perms
        
        # 从数据库获取
        permissions = await super().get_user_permissions(user_id)
        
        # 存入缓存
        await permission_cache.set_user_permissions_cache(user_id, permissions)
        
        return permissions
    
    async def assign_role_to_user(
        self, 
        user_id: int, 
        role_id: int, 
        assigned_by: int, 
        reason: str = None,
        expires_at: datetime = None
    ) -> bool:
        success = await super().assign_role_to_user(user_id, role_id, assigned_by, reason, expires_at)
        if success:
            # 清除用户权限缓存
            await permission_cache.invalidate_user_permissions_cache(user_id)
        return success
    
    async def grant_permission_to_user(
        self, 
        user_id: int, 
        permission_id: int, 
        granted_by: int, 
        reason: str = None,
        expires_at: datetime = None
    ) -> bool:
        success = await super().grant_permission_to_user(user_id, permission_id, granted_by, reason, expires_at)
        if success:
            # 清除用户权限缓存
            await permission_cache.invalidate_user_permissions_cache(user_id)
        return success

def get_cached_permission_service(db: AsyncSession = Depends(get_db)) -> CachedPermissionService:
    return CachedPermissionService(db)

权限预加载优化

from typing import Dict
import asyncio

class PermissionPreloader:
    """权限预加载器 - 预加载常用权限数据"""
    
    def __init__(self):
        self.role_permissions_cache: Dict[int, Set[str]] = {}
        self.permission_metadata_cache: Dict[str, PermissionMetadata] = {}
        self.role_metadata_cache: Dict[str, RoleMetadata] = {}
    
    async def preload_common_data(self):
        """预加载常用权限数据"""
        # 预加载所有角色权限映射
        for role in Role:
            perms = get_role_permissions(role)
            self.role_permissions_cache[role] = {p.value for p in perms}
        
        # 预加载权限元数据
        self.permission_metadata_cache = PERMISSION_METADATA.copy()
        
        # 预加载角色元数据
        self.role_metadata_cache = ROLE_METADATA.copy()
    
    def get_cached_role_permissions(self, role: Role) -> Set[str]:
        """获取缓存的角色权限"""
        return self.role_permissions_cache.get(role, set())
    
    def get_cached_permission_metadata(self, permission: Permission) -> Optional[PermissionMetadata]:
        """获取缓存的权限元数据"""
        return self.permission_metadata_cache.get(permission)
    
    def get_cached_role_metadata(self, role: Role) -> Optional[RoleMetadata]:
        """获取缓存的角色元数据"""
        return self.role_metadata_cache.get(role)

# 全局预加载器实例
permission_preloader = PermissionPreloader()

# 在应用启动时预加载数据
async def startup_event():
    await permission_cache.init_redis()
    await permission_preloader.preload_common_data()

# 在FastAPI应用中注册启动事件
# app.add_event_handler("startup", startup_event)

权限安全最佳实践

权限安全检查清单

"""
权限安全最佳实践清单:

✅ 数据库层面
- [ ] 使用参数化查询防止SQL注入
- [ ] 敏感权限操作记录审计日志
- [ ] 权限变更需要二次确认
- [ ] 实施行级安全控制

✅ 应用层面
- [ ] 默认拒绝所有未明确授权的访问
- [ ] 实施最小权限原则
- [ ] 定期审查权限分配
- [ ] 实施权限分级管理

✅ 网络层面
- [ ] 敏感权限操作使用HTTPS
- [ ] 实施适当的速率限制
- [ ] 记录IP地址用于安全审计
- [ ] 实施会话管理

✅ 监控层面
- [ ] 实时监控权限变更
- [ ] 设置权限异常告警
- [ ] 定期生成权限报告
- [ ] 实施权限使用分析
"""

# 权限安全中间件
class PermissionSecurityMiddleware:
    """权限安全中间件 - 实施安全检查"""
    
    def __init__(self, app):
        self.app = app
    
    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            return await self.app(scope, receive, send)
        
        request = Request(scope)
        
        # 检查是否为敏感权限操作
        if self.is_sensitive_operation(request):
            # 记录操作日志
            await self.log_sensitive_operation(request)
            
            # 检查额外的安全要求
            if not await self.check_additional_security(request):
                response = JSONResponse(
                    status_code=403,
                    content={"detail": "安全检查失败"}
                )
                return await response(scope, receive, send)
        
        return await self.app(scope, receive, send)
    
    def is_sensitive_operation(self, request: Request) -> bool:
        """检查是否为敏感操作"""
        sensitive_paths = [
            "/permissions/user/*/roles/*",      # 角色分配
            "/permissions/user/*/permissions/*", # 权限授予
            "/permissions/roles/",              # 角色创建
            "/permissions/permissions/",        # 权限创建
        ]
        
        path = request.url.path
        return any(
            (pattern == path or (pattern.endswith('*') and path.startswith(pattern[:-1])))
            for pattern in sensitive_paths
        )
    
    async def log_sensitive_operation(self, request: Request):
        """记录敏感操作日志"""
        # 实现日志记录逻辑
        pass
    
    async def check_additional_security(self, request: Request) -> bool:
        """检查额外安全要求"""
        # 实现额外安全检查
        # 如:二次验证、IP白名单等
        return True

权限最小化原则实现

class PrincipleOfLeastPrivilege:
    """最小权限原则实施"""
    
    def __init__(self, perm_service: PermissionService):
        self.perm_service = perm_service
    
    async def validate_user_permissions(self, user_id: int, requested_permissions: List[str]) -> List[str]:
        """验证用户请求的权限是否合理"""
        user_permissions = await self.perm_service.get_user_permissions(user_id)
        
        # 检查是否有过度权限请求
        excessive_perms = []
        justified_perms = []
        
        for perm in requested_permissions:
            if perm in user_permissions:
                # 检查权限是否与当前操作相关
                if await self.is_permission_justified(user_id, perm):
                    justified_perms.append(perm)
                else:
                    excessive_perms.append(perm)
            else:
                excessive_perms.append(perm)
        
        if excessive_perms:
            # 记录过度权限请求
            await self.log_excessive_permission_request(user_id, excessive_perms)
        
        return justified_perms
    
    async def is_permission_justified(self, user_id: int, permission: str) -> bool:
        """检查权限请求是否合理"""
        # 实现合理性检查逻辑
        # 如:检查操作上下文、时间、频率等
        return True
    
    async def log_excessive_permission_request(self, user_id: int, permissions: List[str]):
        """记录过度权限请求"""
        # 实现日志记录
        pass

# 权限使用监控
class PermissionUsageMonitor:
    """权限使用监控器"""
    
    def __init__(self):
        self.usage_stats = {}
        self.alert_thresholds = {
            Permission.USER_DELETE: 10,  # 1小时内删除用户超过10次
            Permission.SYSTEM_CONFIG: 5, # 1小时内修改系统配置超过5次
        }
    
    async def record_permission_usage(self, user_id: int, permission: str, resource_id: int = None):
        """记录权限使用"""
        from datetime import datetime
        now = datetime.utcnow()
        
        key = f"{user_id}:{permission}"
        if key not in self.usage_stats:
            self.usage_stats[key] = []
        
        # 清理过期记录(保留最近1小时)
        cutoff = now - timedelta(hours=1)
        self.usage_stats[key] = [
            record for record in self.usage_stats[key]
            if record['timestamp'] > cutoff
        ]
        
        self.usage_stats[key].append({
            'timestamp': now,
            'resource_id': resource_id,
            'ip_address': None  # 从请求中获取
        })
        
        # 检查是否超过阈值
        if permission in self.alert_thresholds:
            count = len(self.usage_stats[key])
            if count > self.alert_thresholds[permission]:
                await self.trigger_alert(user_id, permission, count)
    
    async def trigger_alert(self, user_id: int, permission: str, count: int):
        """触发权限使用告警"""
        # 实现告警逻辑
        print(f"ALERT: User {user_id} used permission {permission} {count} times in the last hour")

与其他权限框架对比

特性FastAPI RBACDjango GuardianFlask-PrincipalCasbin
集成难度中等高(Django专属)中等
性能中等中等
灵活性中等极高
学习曲线中等低(对Django用户)中等
功能完整性完整完整基础完整
细粒度控制支持支持支持高度支持
跨框架支持
ACM模型支持部分

权限框架选择建议

"""
选择权限框架的决策树:

1. 如果使用Django → Django Guardian
2. 如果需要ACM模型 → Casbin
3. 如果需要最大灵活性 → 自定义RBAC + 权限钩子
4. 如果需要快速实现 → Flask-Principal变体
5. 如果使用FastAPI → 本教程的RBAC实现

FastAPI RBAC优势:
- 与FastAPI深度集成
- 依赖注入友好
- 异步支持
- 类型安全
- 易于测试
- 灵活的扩展性
"""

总结

FastAPI中的RBAC权限控制系统提供了完整的权限管理解决方案:

  1. 权限建模:通过枚举定义清晰的权限结构
  2. 角色管理:灵活的角色-权限映射系统
  3. 依赖注入:优雅的权限检查机制
  4. 细粒度控制:支持资源级权限控制
  5. 数据库集成:完整的权限持久化方案
  6. 性能优化:缓存和预加载机制
  7. 安全实践:最小权限原则和安全监控

正确实施RBAC系统能够:

  • 提高系统安全性
  • 简化权限管理
  • 满足合规要求
  • 支持系统扩展

💡 关键要点:权限逻辑应集中在依赖函数中,避免散落在业务代码中。实施最小权限原则,定期审查权限分配,记录安全审计日志。


🔗 扩展阅读