#FastAPI RBAC权限控制完全指南
📂 所属阶段:第四阶段 — 安全与认证(安全篇)
🔗 相关章节:FastAPI OAuth2与JWT鉴权 · FastAPI依赖注入系统
#目录
- RBAC核心概念
- 权限模型设计
- 权限枚举与角色定义
- 角色权限映射系统
- 权限检查依赖注入
- 细粒度权限控制
- 数据库级权限存储
- 高级权限管理API
- 权限缓存与性能优化
- 权限安全最佳实践
- 与其他权限框架对比
- 总结
#RBAC核心概念
#什么是RBAC(基于角色的访问控制)?
RBAC(Role-Based Access Control)是一种权限管理模型,它通过角色来控制用户对资源的访问权限。这种模型将权限分配给角色,再将角色分配给用户,从而实现更灵活、更易管理的权限系统。
用户 → 角色 → 权限
↓
资源 + 操作
例子:
Alice(用户)→ 管理员(角色)→ 删除用户、修改配置(权限)
Bob(用户) → 编辑者(角色)→ 创建文章、编辑文章(权限)
Carol(用户)→ 普通用户(角色)→ 阅读文章、评论(权限)#RBAC三层模型
- 用户层(User Layer):用户与角色的分配关系
- 角色层(Role Layer):角色与权限的分配关系
- 权限层(Permission Layer):权限与资源操作的映射关系
#RBAC的优势
- 简化管理:通过角色集中管理权限,减少用户-权限的直接关联
- 职责分离:不同角色承担不同职责,避免权限过度集中
- 灵活性:可以动态调整角色权限,适应业务变化
- 可扩展性:易于添加新的角色和权限
- 合规性:满足企业安全合规要求
#权限模型设计
#权限粒度设计原则
| 权限级别 | 示例 | 适用场景 |
|---|---|---|
| 功能级 | user:read, article:write | 控制API端点访问 |
| 数据级 | user:read:own, article:edit:department | 控制特定数据访问 |
| 字段级 | user:read:email, profile:edit:phone | 控制数据字段访问 |
| 操作级 | article:publish, user:delete:permanent | 控制特定操作 |
#权限命名规范
# 推荐的权限命名格式:资源:操作[:范围]
PERMISSION_PATTERNS = {
"basic": "resource:action", # 基础格式
"scoped": "resource:action:scope", # 带范围格式
"hierarchical": "module:resource:action", # 层级格式
}
# 示例权限命名
EXAMPLE_PERMISSIONS = [
"user:read", # 读取用户
"user:create", # 创建用户
"user:update", # 更新用户
"user:delete", # 删除用户
"user:read:own", # 读取自己的用户信息
"article:publish", # 发布文章
"admin:access", # 访问管理后台
"system:config", # 系统配置
]#权限枚举与角色定义
#完整权限枚举定义
# models/permission.py
from enum import Enum
from typing import Set, Dict, Optional
from dataclasses import dataclass
class Permission(str, Enum):
"""权限枚举 - 定义系统中的所有权限"""
# 用户管理权限
USER_READ = "user:read"
USER_CREATE = "user:create"
USER_UPDATE = "user:update"
USER_DELETE = "user:delete"
USER_MANAGE = "user:manage" # 管理用户(包括其他操作)
# 文章管理权限
ARTICLE_READ = "article:read"
ARTICLE_CREATE = "article:create"
ARTICLE_UPDATE = "article:update"
ARTICLE_DELETE = "article:delete"
ARTICLE_PUBLISH = "article:publish"
ARTICLE_MANAGE = "article:manage"
# 评论管理权限
COMMENT_READ = "comment:read"
COMMENT_CREATE = "comment:create"
COMMENT_UPDATE = "comment:update"
COMMENT_DELETE = "comment:delete"
# 系统管理权限
ADMIN_ACCESS = "admin:access"
SYSTEM_CONFIG = "system:config"
SYSTEM_MONITOR = "system:monitor"
SYSTEM_BACKUP = "system:backup"
# 数据导出权限
DATA_EXPORT = "data:export"
DATA_IMPORT = "data:import"
# 审计日志权限
AUDIT_READ = "audit:read"
AUDIT_MANAGE = "audit:manage"
@dataclass
class PermissionMetadata:
"""权限元数据 - 描述权限的详细信息"""
name: str
description: str
category: str # 如: "user", "article", "system"
critical: bool = False # 是否为关键权限
# 权限元数据映射
PERMISSION_METADATA: Dict[Permission, PermissionMetadata] = {
Permission.USER_READ: PermissionMetadata(
name="读取用户",
description="允许查看用户信息",
category="user",
critical=False
),
Permission.USER_CREATE: PermissionMetadata(
name="创建用户",
description="允许创建新用户",
category="user",
critical=True
),
Permission.USER_DELETE: PermissionMetadata(
name="删除用户",
description="允许删除用户账号",
category="user",
critical=True
),
Permission.ADMIN_ACCESS: PermissionMetadata(
name="管理访问",
description="允许访问管理后台",
category="system",
critical=True
),
Permission.SYSTEM_CONFIG: PermissionMetadata(
name="系统配置",
description="允许修改系统配置",
category="system",
critical=True
),
}
class Role(str, Enum):
"""角色枚举 - 定义系统中的所有角色"""
# 管理员角色
SUPER_ADMIN = "super_admin" # 超级管理员 - 拥有所有权限
ADMIN = "admin" # 管理员 - 拥有大部分权限
# 内容管理角色
EDITOR = "editor" # 编辑 - 可编辑发布内容
AUTHOR = "author" # 作者 - 可创作内容
# 普通用户角色
MODERATOR = "moderator" # 版主 - 管理社区内容
USER = "user" # 普通用户 - 基础功能
GUEST = "guest" # 访客 - 有限访问
}
@dataclass
class RoleMetadata:
"""角色元数据 - 描述角色的详细信息"""
name: str
description: str
level: int # 权限等级,数值越大权限越高
inheritable: bool = True # 是否可继承权限
# 角色元数据映射
ROLE_METADATA: Dict[Role, RoleMetadata] = {
Role.SUPER_ADMIN: RoleMetadata(
name="超级管理员",
description="拥有系统最高权限,可执行所有操作",
level=100,
inheritable=True
),
Role.ADMIN: RoleMetadata(
name="管理员",
description="拥有大部分管理权限",
level=90,
inheritable=True
),
Role.EDITOR: RoleMetadata(
name="编辑",
description="可编辑和发布内容",
level=70,
inheritable=False
),
Role.AUTHOR: RoleMetadata(
name="作者",
description="可创作和管理自己的内容",
level=50,
inheritable=False
),
Role.MODERATOR: RoleMetadata(
name="版主",
description="可管理社区内容和用户行为",
level=60,
inheritable=False
),
Role.USER: RoleMetadata(
name="普通用户",
description="可使用基础功能",
level=30,
inheritable=False
),
Role.GUEST: RoleMetadata(
name="访客",
description="仅可查看公开内容",
level=10,
inheritable=False
),
}#权限验证工具函数
from typing import Union, List
def validate_permission_format(permission: str) -> bool:
"""验证权限字符串格式"""
import re
# 格式: resource:action 或 resource:action:scope
pattern = r'^[a-z][a-z0-9_]*:[a-z][a-z0-9_]*(?::[a-z][a-z0-9_]*)?$'
return bool(re.match(pattern, permission))
def normalize_permission(permission: Union[str, Permission]) -> str:
"""标准化权限表示"""
if isinstance(permission, Permission):
return permission.value
return permission.lower()
def parse_permission(permission: str) -> tuple[str, str, Optional[str]]:
"""解析权限字符串为组件"""
parts = permission.split(':')
if len(parts) < 2:
raise ValueError(f"无效的权限格式: {permission}")
resource = parts[0]
action = parts[1]
scope = parts[2] if len(parts) > 2 else None
return resource, action, scope#角色权限映射系统
#静态角色权限映射
# 静态角色权限映射配置
ROLE_PERMISSIONS: Dict[Role, Set[Permission]] = {
Role.SUPER_ADMIN: {
# 拥有所有权限
*list(Permission.__members__.values())
},
Role.ADMIN: {
# 管理员权限
Permission.USER_READ,
Permission.USER_CREATE,
Permission.USER_UPDATE,
Permission.USER_DELETE,
Permission.ARTICLE_READ,
Permission.ARTICLE_CREATE,
Permission.ARTICLE_UPDATE,
Permission.ARTICLE_DELETE,
Permission.ARTICLE_PUBLISH,
Permission.COMMENT_READ,
Permission.COMMENT_DELETE,
Permission.ADMIN_ACCESS,
Permission.SYSTEM_CONFIG,
Permission.AUDIT_READ,
},
Role.EDITOR: {
# 编辑权限
Permission.ARTICLE_READ,
Permission.ARTICLE_CREATE,
Permission.ARTICLE_UPDATE,
Permission.ARTICLE_DELETE,
Permission.ARTICLE_PUBLISH,
Permission.COMMENT_READ,
Permission.USER_READ,
},
Role.AUTHOR: {
# 作者权限
Permission.ARTICLE_READ,
Permission.ARTICLE_CREATE,
Permission.ARTICLE_UPDATE, # 仅自己的文章
Permission.ARTICLE_DELETE, # 仅自己的文章
Permission.COMMENT_READ,
},
Role.MODERATOR: {
# 版主权限
Permission.COMMENT_READ,
Permission.COMMENT_UPDATE,
Permission.COMMENT_DELETE,
Permission.ARTICLE_READ,
Permission.AUDIT_READ,
},
Role.USER: {
# 普通用户权限
Permission.ARTICLE_READ,
Permission.COMMENT_CREATE,
},
Role.GUEST: {
# 访客权限
Permission.ARTICLE_READ,
},
}
def get_role_permissions(role: Role) -> Set[Permission]:
"""获取角色的权限集合"""
return ROLE_PERMISSIONS.get(role, set())
def has_permission(role: Role, permission: Permission) -> bool:
"""检查角色是否拥有指定权限"""
return permission in get_role_permissions(role)
def get_permission_hierarchy() -> Dict[str, Set[Permission]]:
"""获取权限层级映射"""
hierarchy = {}
for role, permissions in ROLE_PERMISSIONS.items():
role_level = ROLE_METADATA[role].level
for perm in permissions:
if perm not in hierarchy:
hierarchy[perm] = set()
hierarchy[perm].add(role)
return hierarchy#动态权限系统
from typing import Optional, Dict, Any
from datetime import datetime
import asyncio
from dataclasses import dataclass
@dataclass
class DynamicPermission:
"""动态权限 - 带有条件和有效期的权限"""
permission: Permission
condition: Optional[str] = None # 条件表达式
valid_from: Optional[datetime] = None
valid_until: Optional[datetime] = None
metadata: Optional[Dict[str, Any]] = None
class DynamicPermissionManager:
"""动态权限管理器"""
def __init__(self):
self.dynamic_permissions: Dict[int, List[DynamicPermission]] = {} # user_id -> permissions
self.temp_permissions: Dict[str, DynamicPermission] = {} # token -> permission
async def grant_temp_permission(
self,
user_id: int,
permission: Permission,
duration_minutes: int = 5
) -> str:
"""授予临时权限"""
import uuid
from datetime import timedelta
temp_token = str(uuid.uuid4())
expiry_time = datetime.utcnow() + timedelta(minutes=duration_minutes)
temp_perm = DynamicPermission(
permission=permission,
valid_until=expiry_time
)
self.temp_permissions[temp_token] = temp_perm
return temp_token
async def check_dynamic_permission(
self,
user_id: int,
permission: Permission,
context: Optional[Dict[str, Any]] = None
) -> bool:
"""检查动态权限"""
now = datetime.utcnow()
# 检查用户特定的动态权限
user_perms = self.dynamic_permissions.get(user_id, [])
for dyn_perm in user_perms:
if (dyn_perm.permission == permission and
self._is_permission_valid(dyn_perm, now) and
self._check_condition(dyn_perm, context)):
return True
# 检查临时权限
for temp_token, temp_perm in self.temp_permissions.items():
if (temp_perm.permission == permission and
self._is_permission_valid(temp_perm, now)):
# 临时权限使用后立即删除
del self.temp_permissions[temp_token]
return True
return False
def _is_permission_valid(self, perm: DynamicPermission, now: datetime) -> bool:
"""检查权限是否在有效期内"""
if perm.valid_from and now < perm.valid_from:
return False
if perm.valid_until and now > perm.valid_until:
return False
return True
def _check_condition(self, perm: DynamicPermission, context: Optional[Dict]) -> bool:
"""检查权限条件"""
if not perm.condition or not context:
return True
# 简单的条件检查(实际项目中应使用安全的表达式引擎)
try:
# 这里只是一个示例,实际应用中需要使用安全的条件评估
return eval(perm.condition, {"context": context, "__builtins__": {}})
except:
return False
# 全局动态权限管理器实例
dynamic_perm_manager = DynamicPermissionManager()#权限检查依赖注入
#基础权限检查依赖
# dependencies/auth.py
from fastapi import Depends, HTTPException, status
from typing import Union, List, Optional
from functools import wraps
from models.permission import (
Permission, Role, ROLE_PERMISSIONS,
get_role_permissions, has_permission,
dynamic_perm_manager
)
from models.user import User
from auth.jwt import get_current_user
def require_permission(permission: Permission):
"""检查用户是否拥有指定权限"""
async def checker(current_user: User = Depends(get_current_user)) -> User:
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户账户已被禁用"
)
# 检查静态权限
user_role = Role(current_user.role)
user_permissions = get_role_permissions(user_role)
if permission in user_permissions:
return current_user
# 检查动态权限
has_dyn_perm = await dynamic_perm_manager.check_dynamic_permission(
current_user.id, permission
)
if has_dyn_perm:
return current_user
# 检查用户特定权限(数据库中存储的额外权限)
user_specific_perms = await get_user_specific_permissions(current_user.id)
if permission.value in user_specific_perms:
return current_user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"缺少权限: {permission.value}",
)
return checker
def require_any_permission(*permissions: Permission):
"""拥有任一权限即可通过"""
async def checker(current_user: User = Depends(get_current_user)) -> User:
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户账户已被禁用"
)
user_role = Role(current_user.role)
user_permissions = get_role_permissions(user_role)
# 检查是否有任一权限
for perm in permissions:
if perm in user_permissions:
return current_user
# 检查动态权限
for perm in permissions:
has_dyn_perm = await dynamic_perm_manager.check_dynamic_permission(
current_user.id, perm
)
if has_dyn_perm:
return current_user
# 检查用户特定权限
user_specific_perms = await get_user_specific_permissions(current_user.id)
for perm in permissions:
if perm.value in user_specific_perms:
return current_user
required_perms = [p.value for p in permissions]
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"需要以下任一权限: {required_perms}"
)
return checker
def require_all_permissions(*permissions: Permission):
"""必须拥有所有权限才通过"""
async def checker(current_user: User = Depends(get_current_user)) -> User:
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户账户已被禁用"
)
user_role = Role(current_user.role)
user_permissions = get_role_permissions(user_role)
# 检查是否拥有所有权限
missing_perms = []
for perm in permissions:
if perm not in user_permissions:
# 检查动态权限
has_dyn_perm = await dynamic_perm_manager.check_dynamic_permission(
current_user.id, perm
)
if not has_dyn_perm:
# 检查用户特定权限
user_specific_perms = await get_user_specific_permissions(current_user.id)
if perm.value not in user_specific_perms:
missing_perms.append(perm.value)
if missing_perms:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"缺少以下权限: {missing_perms}"
)
return current_user
return checker
def require_role(*roles: Role):
"""检查用户角色"""
async def checker(current_user: User = Depends(get_current_user)) -> User:
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户账户已被禁用"
)
user_role = Role(current_user.role)
if user_role not in roles:
allowed_roles = [r.value for r in roles]
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"需要以下角色之一: {allowed_roles}, 当前角色: {user_role.value}"
)
return current_user
return checker
def require_highest_role():
"""检查用户是否拥有最高权限角色"""
async def checker(current_user: User = Depends(get_current_user)) -> User:
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户账户已被禁用"
)
user_role = Role(current_user.role)
highest_level = max(ROLE_METADATA[r].level for r in Role)
user_level = ROLE_METADATA[user_role].level
if user_level != highest_level:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"需要最高权限角色"
)
return current_user
return checker#高级权限检查依赖
from typing import Callable, Any
from pydantic import BaseModel
class PermissionContext(BaseModel):
"""权限检查上下文"""
resource_id: Optional[int] = None
resource_owner_id: Optional[int] = None
action: Optional[str] = None
additional_data: Optional[dict] = None
def require_resource_permission(
permission: Permission,
owner_field: str = "owner_id",
resource_param: str = "resource_id"
):
"""资源级权限检查 - 检查用户是否有权访问特定资源"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
# 获取当前用户
current_user = kwargs.get('current_user') or next(
(arg for arg in args if isinstance(arg, User)), None
)
if not current_user or not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户未认证或账户已被禁用"
)
# 获取资源ID
resource_id = kwargs.get(resource_param)
if not resource_id:
# 尝试从路径参数获取
import inspect
sig = inspect.signature(func)
for param_name, param in sig.parameters.items():
if param_name.endswith('_id') and param_name != 'current_user':
resource_id = kwargs.get(param_name)
break
# 获取资源(这里假设有一个获取资源的函数)
resource = await get_resource_by_id(resource_id)
if not resource:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="资源不存在"
)
# 检查权限
user_role = Role(current_user.role)
user_permissions = get_role_permissions(user_role)
# 超级管理员有所有权限
if user_role == Role.SUPER_ADMIN:
return await func(*args, **kwargs)
# 检查基础权限
if permission not in user_permissions:
# 检查动态权限
has_dyn_perm = await dynamic_perm_manager.check_dynamic_permission(
current_user.id, permission,
context={"resource_id": resource_id, "action": "access"}
)
if not has_dyn_perm:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"无权访问此资源: {permission.value}"
)
# 检查资源所有权(对于需要所有权的权限)
if permission in [Permission.ARTICLE_UPDATE, Permission.ARTICLE_DELETE]:
resource_owner_id = getattr(resource, owner_field, None)
if resource_owner_id != current_user.id:
# 检查是否有管理权限
if Permission.ARTICLE_MANAGE not in user_permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="无权操作他人资源"
)
return await func(*args, **kwargs)
return wrapper
return decorator
def permission_required_with_context(permission: Permission, context_func: Callable):
"""带上下文的权限检查"""
async def checker(current_user: User = Depends(get_current_user)) -> User:
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户账户已被禁用"
)
# 获取权限检查上下文
context = await context_func(current_user)
user_role = Role(current_user.role)
user_permissions = get_role_permissions(user_role)
# 基础权限检查
if permission in user_permissions:
return current_user
# 动态权限检查
has_dyn_perm = await dynamic_perm_manager.check_dynamic_permission(
current_user.id, permission, context=context
)
if has_dyn_perm:
return current_user
# 用户特定权限检查
user_specific_perms = await get_user_specific_permissions(current_user.id)
if permission.value in user_specific_perms:
return current_user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"缺少权限: {permission.value}"
)
return checker#细粒度权限控制
#基于资源所有权的权限控制
async def require_resource_ownership(
resource_owner_id: int,
required_permission: Permission,
current_user: User = Depends(get_current_user)
) -> User:
"""检查资源所有权或管理权限"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户账户已被禁用"
)
user_role = Role(current_user.role)
user_permissions = get_role_permissions(user_role)
# 超级管理员有所有权限
if user_role == Role.SUPER_ADMIN:
return current_user
# 用户拥有资源或有管理权限
if current_user.id == resource_owner_id:
# 检查用户对自己资源的基本权限
basic_perms = {
Permission.ARTICLE_UPDATE, Permission.ARTICLE_DELETE,
Permission.USER_UPDATE, Permission.USER_DELETE
}
if required_permission in basic_perms:
return current_user
# 检查管理权限
management_perms = {
Permission.USER_MANAGE, Permission.ARTICLE_MANAGE,
Permission.COMMENT_MANAGE
}
for manage_perm in management_perms:
if manage_perm in user_permissions:
# 检查具体权限
if required_permission in user_permissions:
return current_user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="无权操作此资源"
)
# 使用示例
@app.delete("/articles/{article_id}")
async def delete_article(
article_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
article = await db.get(Article, article_id)
if not article:
raise HTTPException(status_code=404, detail="文章不存在")
# 检查资源所有权或管理权限
await require_resource_ownership(article.author_id, Permission.ARTICLE_DELETE, current_user)
await db.delete(article)
await db.commit()
return {"message": "文章已删除"}#基于部门/团队的权限控制
class DepartmentPermissionChecker:
"""部门权限检查器"""
def __init__(self, db: AsyncSession):
self.db = db
async def user_has_department_access(
self,
user_id: int,
department_id: int,
permission: Permission
) -> bool:
"""检查用户是否有部门访问权限"""
# 获取用户部门关系
stmt = select(UserDepartment).where(
UserDepartment.user_id == user_id,
UserDepartment.department_id == department_id
)
user_dept = await self.db.execute(stmt)
user_dept_rel = user_dept.scalar()
if not user_dept_rel:
return False
# 检查部门级别权限
if user_dept_rel.role == "admin":
# 部门管理员有所有权限
return True
elif user_dept_rel.role == "member":
# 普通成员只有基本权限
basic_perms = {
Permission.ARTICLE_READ, Permission.ARTICLE_CREATE,
Permission.COMMENT_READ, Permission.COMMENT_CREATE
}
return permission in basic_perms
return False
def require_department_permission(
department_id_param: str = "department_id",
permission: Permission = Permission.ARTICLE_READ
):
"""部门权限检查依赖"""
async def checker(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
department_id: int = None # 通过路径参数或其他方式传入
) -> User:
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户账户已被禁用"
)
# 如果department_id未通过参数传入,尝试从请求上下文获取
if department_id is None:
# 这里可以从请求路径、查询参数等获取
pass
checker = DepartmentPermissionChecker(db)
has_access = await checker.user_has_department_access(
current_user.id, department_id, permission
)
if not has_access:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"无权访问部门资源: {permission.value}"
)
return current_user
return checker#时间窗口权限控制
class TimeBasedPermissionChecker:
"""基于时间的权限检查器"""
def __init__(self):
self.work_hours_start = 9 # 9 AM
self.work_hours_end = 18 # 6 PM
def is_work_hours(self) -> bool:
"""检查是否在工作时间内"""
from datetime import datetime
now = datetime.now()
return self.work_hours_start <= now.hour < self.work_hours_end
def is_weekday(self) -> bool:
"""检查是否为工作日"""
from datetime import datetime
now = datetime.now()
return now.weekday() < 5 # Monday to Friday
async def check_time_based_permission(
self,
user_id: int,
permission: Permission
) -> bool:
"""检查基于时间的权限"""
# 某些敏感权限只能在工作时间执行
sensitive_perms = {
Permission.USER_DELETE, Permission.SYSTEM_CONFIG,
Permission.DATA_EXPORT, Permission.SYSTEM_BACKUP
}
if permission in sensitive_perms:
return self.is_work_hours() and self.is_weekday()
return True
time_perm_checker = TimeBasedPermissionChecker()
def require_time_based_permission(permission: Permission):
"""时间基权限检查依赖"""
async def checker(current_user: User = Depends(get_current_user)) -> User:
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户账户已被禁用"
)
# 检查时间限制
if not await time_perm_checker.check_time_based_permission(
current_user.id, permission
):
from datetime import datetime
now = datetime.now()
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"权限 {permission.value} 只能在工作时间(周一至周五 9:00-18:00)使用。当前时间: {now.strftime('%Y-%m-%d %H:%M:%S')}"
)
# 执行正常的权限检查
user_role = Role(current_user.role)
user_permissions = get_role_permissions(user_role)
if permission in user_permissions:
return current_user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"缺少权限: {permission.value}"
)
return checker#数据库级权限存储
#权限相关数据库模型
# models/permission_models.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text, ForeignKey
from sqlalchemy.orm import relationship, Mapped, mapped_column
from sqlalchemy.sql import func
from typing import List
from datetime import datetime
# 用户-角色关联表
user_roles = Table(
"user_roles",
Base.metadata,
Column("user_id", Integer, ForeignKey("users.id"), primary_key=True),
Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
)
# 角色-权限关联表
role_permissions = Table(
"role_permissions",
Base.metadata,
Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
Column("permission_id", Integer, ForeignKey("permissions.id"), primary_key=True),
)
# 用户-权限关联表(用于用户特定权限)
user_permissions = Table(
"user_permissions",
Base.metadata,
Column("user_id", Integer, ForeignKey("users.id"), primary_key=True),
Column("permission_id", Integer, ForeignKey("permissions.id"), primary_key=True),
)
class RoleModel(Base):
"""角色模型 - 数据库存储的角色信息"""
__tablename__ = "roles"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True, index=True)
description: Mapped[str] = mapped_column(Text, nullable=True)
level: Mapped[int] = mapped_column(default=0) # 权限等级
is_active: Mapped[bool] = mapped_column(default=True)
created_at: Mapped[datetime] = mapped_column(default=func.now())
updated_at: Mapped[datetime] = mapped_column(default=func.now(), onupdate=func.now())
# 关系
permissions: Mapped[List["PermissionModel"]] = relationship(
"PermissionModel",
secondary=role_permissions,
back_populates="roles"
)
users: Mapped[List["User"]] = relationship(
"User",
secondary=user_roles,
back_populates="roles"
)
class PermissionModel(Base):
"""权限模型 - 数据库存储的权限信息"""
__tablename__ = "permissions"
id: Mapped[int] = mapped_column(primary_key=True)
code: Mapped[str] = mapped_column(String(100), unique=True, index=True) # 如: "user:read"
name: Mapped[str] = mapped_column(String(100)) # 权限名称
description: Mapped[str] = mapped_column(Text, nullable=True) # 权限描述
category: Mapped[str] = mapped_column(String(50), default="general") # 权限类别
is_active: Mapped[bool] = mapped_column(default=True)
critical: Mapped[bool] = mapped_column(default=False) # 是否为关键权限
created_at: Mapped[datetime] = mapped_column(default=func.now())
updated_at: Mapped[datetime] = mapped_column(default=func.now(), onupdate=func.now())
# 关系
roles: Mapped[List[RoleModel]] = relationship(
"RoleModel",
secondary=role_permissions,
back_populates="permissions"
)
users: Mapped[List["User"]] = relationship(
"User",
secondary=user_permissions,
back_populates="specific_permissions"
)
class UserRole(Base):
"""用户角色关系 - 记录用户的角色分配"""
__tablename__ = "user_roles"
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), primary_key=True)
role_id: Mapped[int] = mapped_column(ForeignKey("roles.id"), primary_key=True)
assigned_by: Mapped[int] = mapped_column(ForeignKey("users.id")) # 分配人
assigned_at: Mapped[datetime] = mapped_column(default=func.now())
expires_at: Mapped[datetime] = mapped_column(nullable=True) # 过期时间
reason: Mapped[str] = mapped_column(Text, nullable=True) # 分配原因
class UserPermission(Base):
"""用户特定权限 - 为用户单独分配的权限"""
__tablename__ = "user_permissions"
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), primary_key=True)
permission_id: Mapped[int] = mapped_column(ForeignKey("permissions.id"), primary_key=True)
granted_by: Mapped[int] = mapped_column(ForeignKey("users.id")) # 授予权限的人
granted_at: Mapped[datetime] = mapped_column(default=func.now())
expires_at: Mapped[datetime] = mapped_column(nullable=True) # 过期时间
reason: Mapped[str] = mapped_column(Text, nullable=True) # 授予原因#权限服务层
# services/permission_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, insert, delete
from typing import List, Set, Optional
from datetime import datetime
class PermissionService:
"""权限服务 - 处理权限相关的业务逻辑"""
def __init__(self, db: AsyncSession):
self.db = db
async def get_user_permissions(self, user_id: int) -> Set[str]:
"""获取用户的所有权限(角色权限 + 个人权限)"""
permissions = set()
# 获取用户角色
user_roles_stmt = select(UserRole).where(UserRole.user_id == user_id)
user_roles_result = await self.db.execute(user_roles_stmt)
user_roles = user_roles_result.scalars().all()
# 获取角色权限
for user_role in user_roles:
role_perms_stmt = select(PermissionModel).join(
role_permissions
).where(role_permissions.c.role_id == user_role.role_id)
role_perms_result = await self.db.execute(role_perms_stmt)
role_perms = role_perms_result.scalars().all()
for perm in role_perms:
permissions.add(perm.code)
# 获取用户特定权限
user_perms_stmt = select(PermissionModel).join(
user_permissions
).where(user_permissions.c.user_id == user_id)
user_perms_result = await self.db.execute(user_perms_stmt)
user_perms = user_perms_result.scalars().all()
for perm in user_perms:
permissions.add(perm.code)
return permissions
async def user_has_permission(self, user_id: int, permission_code: str) -> bool:
"""检查用户是否有特定权限"""
user_perms = await self.get_user_permissions(user_id)
return permission_code in user_perms
async def assign_role_to_user(
self,
user_id: int,
role_id: int,
assigned_by: int,
reason: str = None,
expires_at: datetime = None
) -> bool:
"""为用户分配角色"""
try:
stmt = insert(UserRole).values(
user_id=user_id,
role_id=role_id,
assigned_by=assigned_by,
reason=reason,
expires_at=expires_at
)
await self.db.execute(stmt)
await self.db.commit()
return True
except Exception:
await self.db.rollback()
return False
async def remove_role_from_user(self, user_id: int, role_id: int) -> bool:
"""从用户移除角色"""
try:
stmt = delete(UserRole).where(
UserRole.user_id == user_id,
UserRole.role_id == role_id
)
await self.db.execute(stmt)
await self.db.commit()
return True
except Exception:
await self.db.rollback()
return False
async def grant_permission_to_user(
self,
user_id: int,
permission_id: int,
granted_by: int,
reason: str = None,
expires_at: datetime = None
) -> bool:
"""为用户授予特定权限"""
try:
stmt = insert(UserPermission).values(
user_id=user_id,
permission_id=permission_id,
granted_by=granted_by,
reason=reason,
expires_at=expires_at
)
await self.db.execute(stmt)
await self.db.commit()
return True
except Exception:
await self.db.rollback()
return False
async def revoke_permission_from_user(self, user_id: int, permission_id: int) -> bool:
"""从用户撤销特定权限"""
try:
stmt = delete(UserPermission).where(
UserPermission.user_id == user_id,
UserPermission.permission_id == permission_id
)
await self.db.execute(stmt)
await self.db.commit()
return True
except Exception:
await self.db.rollback()
return False
async def get_role_permissions(self, role_id: int) -> List[str]:
"""获取角色的权限列表"""
stmt = select(PermissionModel.code).join(
role_permissions
).where(role_permissions.c.role_id == role_id)
result = await self.db.execute(stmt)
permissions = [row[0] for row in result.all()]
return permissions
async def create_role(
self,
name: str,
description: str,
level: int = 0
) -> Optional[RoleModel]:
"""创建新角色"""
try:
role = RoleModel(
name=name,
description=description,
level=level
)
self.db.add(role)
await self.db.commit()
await self.db.refresh(role)
return role
except Exception:
await self.db.rollback()
return None
async def create_permission(
self,
code: str,
name: str,
description: str,
category: str = "general"
) -> Optional[PermissionModel]:
"""创建新权限"""
try:
permission = PermissionModel(
code=code,
name=name,
description=description,
category=category
)
self.db.add(permission)
await self.db.commit()
await self.db.refresh(permission)
return permission
except Exception:
await self.db.rollback()
return None
# 权限服务实例
def get_permission_service(db: AsyncSession = Depends(get_db)) -> PermissionService:
return PermissionService(db)#高级权限管理API
#权限管理路由
# routers/permissions.py
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional
from datetime import datetime
from services.permission_service import PermissionService, get_permission_service
from models.permission import Permission, Role, require_permission
from models.user import User
router = APIRouter(prefix="/permissions", tags=["权限管理"])
@router.get("/user/{user_id}/permissions", response_model=List[str])
async def get_user_permissions(
user_id: int,
current_user: User = Depends(require_permission(Permission.USER_READ)),
perm_service: PermissionService = Depends(get_permission_service)
):
"""获取用户的权限列表"""
permissions = await perm_service.get_user_permissions(user_id)
return list(permissions)
@router.get("/user/{user_id}/has-permission/{permission_code}")
async def check_user_permission(
user_id: int,
permission_code: str,
current_user: User = Depends(require_permission(Permission.USER_READ)),
perm_service: PermissionService = Depends(get_permission_service)
):
"""检查用户是否有特定权限"""
has_perm = await perm_service.user_has_permission(user_id, permission_code)
return {"user_id": user_id, "permission": permission_code, "has_permission": has_perm}
@router.post("/user/{user_id}/roles/{role_id}")
async def assign_role_to_user(
user_id: int,
role_id: int,
reason: str = Query(None, description="分配原因"),
expires_at: str = Query(None, description="过期时间 (YYYY-MM-DDTHH:MM:SS)"),
current_user: User = Depends(require_permission(Permission.USER_MANAGE)),
perm_service: PermissionService = Depends(get_permission_service)
):
"""为用户分配角色"""
expires_datetime = None
if expires_at:
try:
expires_datetime = datetime.fromisoformat(expires_at.replace('Z', '+00:00'))
except ValueError:
raise HTTPException(status_code=400, detail="无效的时间格式")
success = await perm_service.assign_role_to_user(
user_id, role_id, current_user.id, reason, expires_datetime
)
if not success:
raise HTTPException(status_code=400, detail="角色分配失败")
return {"message": "角色分配成功", "user_id": user_id, "role_id": role_id}
@router.delete("/user/{user_id}/roles/{role_id}")
async def remove_role_from_user(
user_id: int,
role_id: int,
current_user: User = Depends(require_permission(Permission.USER_MANAGE)),
perm_service: PermissionService = Depends(get_permission_service)
):
"""从用户移除角色"""
success = await perm_service.remove_role_from_user(user_id, role_id)
if not success:
raise HTTPException(status_code=400, detail="角色移除失败")
return {"message": "角色移除成功", "user_id": user_id, "role_id": role_id}
@router.post("/user/{user_id}/permissions/{permission_id}")
async def grant_user_permission(
user_id: int,
permission_id: int,
reason: str = Query(None, description="授予权限原因"),
expires_at: str = Query(None, description="过期时间 (YYYY-MM-DDTHH:MM:SS)"),
current_user: User = Depends(require_permission(Permission.USER_MANAGE)),
perm_service: PermissionService = Depends(get_permission_service)
):
"""为用户授予特定权限"""
expires_datetime = None
if expires_at:
try:
expires_datetime = datetime.fromisoformat(expires_at.replace('Z', '+00:00'))
except ValueError:
raise HTTPException(status_code=400, detail="无效的时间格式")
success = await perm_service.grant_permission_to_user(
user_id, permission_id, current_user.id, reason, expires_datetime
)
if not success:
raise HTTPException(status_code=400, detail="权限授予失败")
return {"message": "权限授予成功", "user_id": user_id, "permission_id": permission_id}
@router.delete("/user/{user_id}/permissions/{permission_id}")
async def revoke_user_permission(
user_id: int,
permission_id: int,
current_user: User = Depends(require_permission(Permission.USER_MANAGE)),
perm_service: PermissionService = Depends(get_permission_service)
):
"""从用户撤销特定权限"""
success = await perm_service.revoke_permission_from_user(user_id, permission_id)
if not success:
raise HTTPException(status_code=400, detail="权限撤销失败")
return {"message": "权限撤销成功", "user_id": user_id, "permission_id": permission_id}
@router.get("/roles/{role_id}/permissions", response_model=List[str])
async def get_role_permissions(
role_id: int,
current_user: User = Depends(require_permission(Permission.AUDIT_READ)),
perm_service: PermissionService = Depends(get_permission_service)
):
"""获取角色的权限列表"""
permissions = await perm_service.get_role_permissions(role_id)
return permissions
@router.post("/roles/", response_model=dict)
async def create_new_role(
name: str = Query(..., description="角色名称"),
description: str = Query(..., description="角色描述"),
level: int = Query(0, description="权限等级"),
current_user: User = Depends(require_permission(Permission.SYSTEM_CONFIG)),
perm_service: PermissionService = Depends(get_permission_service)
):
"""创建新角色"""
role = await perm_service.create_role(name, description, level)
if not role:
raise HTTPException(status_code=400, detail="角色创建失败")
return {"message": "角色创建成功", "role_id": role.id, "role_name": role.name}
@router.post("/permissions/", response_model=dict)
async def create_new_permission(
code: str = Query(..., description="权限代码,如: user:read"),
name: str = Query(..., description="权限名称"),
description: str = Query(..., description="权限描述"),
category: str = Query("general", description="权限类别"),
current_user: User = Depends(require_permission(Permission.SYSTEM_CONFIG)),
perm_service: PermissionService = Depends(get_permission_service)
):
"""创建新权限"""
# 验证权限代码格式
if not validate_permission_format(code):
raise HTTPException(status_code=400, detail="无效的权限代码格式")
permission = await perm_service.create_permission(code, name, description, category)
if not permission:
raise HTTPException(status_code=400, detail="权限创建失败")
return {
"message": "权限创建成功",
"permission_id": permission.id,
"permission_code": permission.code
}#权限审计API
# routers/permission_audit.py
from fastapi import APIRouter, Depends, Query
from typing import List
from datetime import datetime, timedelta
router = APIRouter(prefix="/permissions/audit", tags=["权限审计"])
class PermissionAuditLog(Base):
"""权限变更审计日志"""
__tablename__ = "permission_audit_logs"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
action: Mapped[str] = mapped_column(String(50)) # "assign_role", "grant_permission", etc.
target_user_id: Mapped[int] = mapped_column(ForeignKey("users.id")) # 被操作的用户
role_id: Mapped[Optional[int]] = mapped_column(ForeignKey("roles.id"), nullable=True)
permission_id: Mapped[Optional[int]] = mapped_column(ForeignKey("permissions.id"), nullable=True)
reason: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
performed_by: Mapped[int] = mapped_column(ForeignKey("users.id")) # 执行操作的用户
timestamp: Mapped[datetime] = mapped_column(default=func.now())
ip_address: Mapped[Optional[str]] = mapped_column(String(45), nullable=True)
@router.get("/logs")
async def get_permission_audit_logs(
current_user: User = Depends(require_permission(Permission.AUDIT_READ)),
limit: int = Query(100, le=1000),
offset: int = Query(0),
start_date: str = Query(None),
end_date: str = Query(None),
action: str = Query(None),
user_id: int = Query(None),
db: AsyncSession = Depends(get_db)
):
"""获取权限变更审计日志"""
query = select(PermissionAuditLog).order_by(PermissionAuditLog.timestamp.desc())
if start_date:
start_dt = datetime.fromisoformat(start_date)
query = query.where(PermissionAuditLog.timestamp >= start_dt)
if end_date:
end_dt = datetime.fromisoformat(end_date)
query = query.where(PermissionAuditLog.timestamp <= end_dt)
if action:
query = query.where(PermissionAuditLog.action == action)
if user_id:
query = query.where(PermissionAuditLog.target_user_id == user_id)
query = query.limit(limit).offset(offset)
result = await db.execute(query)
logs = result.scalars().all()
return {
"logs": [
{
"id": log.id,
"user_id": log.target_user_id,
"action": log.action,
"performed_by": log.performed_by,
"reason": log.reason,
"timestamp": log.timestamp.isoformat(),
"ip_address": log.ip_address
}
for log in logs
],
"total": len(logs)
}#权限缓存与性能优化
#权限缓存系统
import aioredis
from typing import Optional, Set
import json
from datetime import timedelta
class PermissionCache:
"""权限缓存系统"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_url = redis_url
self.redis = None
async def init_redis(self):
"""初始化Redis连接"""
self.redis = await aioredis.from_url(self.redis_url)
async def get_user_permissions_cached(self, user_id: int) -> Optional[Set[str]]:
"""从缓存获取用户权限"""
if not self.redis:
return None
cache_key = f"user_permissions:{user_id}"
cached = await self.redis.get(cache_key)
if cached:
return set(json.loads(cached))
return None
async def set_user_permissions_cache(self, user_id: int, permissions: Set[str]):
"""设置用户权限缓存(缓存24小时)"""
if not self.redis:
return
cache_key = f"user_permissions:{user_id}"
await self.redis.setex(
cache_key,
timedelta(hours=24),
json.dumps(list(permissions))
)
async def invalidate_user_permissions_cache(self, user_id: int):
"""清除用户权限缓存"""
if not self.redis:
return
cache_key = f"user_permissions:{user_id}"
await self.redis.delete(cache_key)
async def invalidate_role_permissions_cache(self, role_id: int):
"""清除角色权限缓存"""
if not self.redis:
return
# 找到所有拥有此角色的用户并清除其缓存
# 这里需要查询数据库获取用户列表
pass
# 全局权限缓存实例
permission_cache = PermissionCache()
# 修改权限服务以使用缓存
class CachedPermissionService(PermissionService):
"""带缓存的权限服务"""
async def get_user_permissions(self, user_id: int) -> Set[str]:
# 先从缓存获取
cached_perms = await permission_cache.get_user_permissions_cached(user_id)
if cached_perms is not None:
return cached_perms
# 从数据库获取
permissions = await super().get_user_permissions(user_id)
# 存入缓存
await permission_cache.set_user_permissions_cache(user_id, permissions)
return permissions
async def assign_role_to_user(
self,
user_id: int,
role_id: int,
assigned_by: int,
reason: str = None,
expires_at: datetime = None
) -> bool:
success = await super().assign_role_to_user(user_id, role_id, assigned_by, reason, expires_at)
if success:
# 清除用户权限缓存
await permission_cache.invalidate_user_permissions_cache(user_id)
return success
async def grant_permission_to_user(
self,
user_id: int,
permission_id: int,
granted_by: int,
reason: str = None,
expires_at: datetime = None
) -> bool:
success = await super().grant_permission_to_user(user_id, permission_id, granted_by, reason, expires_at)
if success:
# 清除用户权限缓存
await permission_cache.invalidate_user_permissions_cache(user_id)
return success
def get_cached_permission_service(db: AsyncSession = Depends(get_db)) -> CachedPermissionService:
return CachedPermissionService(db)#权限预加载优化
from typing import Dict
import asyncio
class PermissionPreloader:
"""权限预加载器 - 预加载常用权限数据"""
def __init__(self):
self.role_permissions_cache: Dict[int, Set[str]] = {}
self.permission_metadata_cache: Dict[str, PermissionMetadata] = {}
self.role_metadata_cache: Dict[str, RoleMetadata] = {}
async def preload_common_data(self):
"""预加载常用权限数据"""
# 预加载所有角色权限映射
for role in Role:
perms = get_role_permissions(role)
self.role_permissions_cache[role] = {p.value for p in perms}
# 预加载权限元数据
self.permission_metadata_cache = PERMISSION_METADATA.copy()
# 预加载角色元数据
self.role_metadata_cache = ROLE_METADATA.copy()
def get_cached_role_permissions(self, role: Role) -> Set[str]:
"""获取缓存的角色权限"""
return self.role_permissions_cache.get(role, set())
def get_cached_permission_metadata(self, permission: Permission) -> Optional[PermissionMetadata]:
"""获取缓存的权限元数据"""
return self.permission_metadata_cache.get(permission)
def get_cached_role_metadata(self, role: Role) -> Optional[RoleMetadata]:
"""获取缓存的角色元数据"""
return self.role_metadata_cache.get(role)
# 全局预加载器实例
permission_preloader = PermissionPreloader()
# 在应用启动时预加载数据
async def startup_event():
await permission_cache.init_redis()
await permission_preloader.preload_common_data()
# 在FastAPI应用中注册启动事件
# app.add_event_handler("startup", startup_event)#权限安全最佳实践
#权限安全检查清单
"""
权限安全最佳实践清单:
✅ 数据库层面
- [ ] 使用参数化查询防止SQL注入
- [ ] 敏感权限操作记录审计日志
- [ ] 权限变更需要二次确认
- [ ] 实施行级安全控制
✅ 应用层面
- [ ] 默认拒绝所有未明确授权的访问
- [ ] 实施最小权限原则
- [ ] 定期审查权限分配
- [ ] 实施权限分级管理
✅ 网络层面
- [ ] 敏感权限操作使用HTTPS
- [ ] 实施适当的速率限制
- [ ] 记录IP地址用于安全审计
- [ ] 实施会话管理
✅ 监控层面
- [ ] 实时监控权限变更
- [ ] 设置权限异常告警
- [ ] 定期生成权限报告
- [ ] 实施权限使用分析
"""
# 权限安全中间件
class PermissionSecurityMiddleware:
"""权限安全中间件 - 实施安全检查"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
return await self.app(scope, receive, send)
request = Request(scope)
# 检查是否为敏感权限操作
if self.is_sensitive_operation(request):
# 记录操作日志
await self.log_sensitive_operation(request)
# 检查额外的安全要求
if not await self.check_additional_security(request):
response = JSONResponse(
status_code=403,
content={"detail": "安全检查失败"}
)
return await response(scope, receive, send)
return await self.app(scope, receive, send)
def is_sensitive_operation(self, request: Request) -> bool:
"""检查是否为敏感操作"""
sensitive_paths = [
"/permissions/user/*/roles/*", # 角色分配
"/permissions/user/*/permissions/*", # 权限授予
"/permissions/roles/", # 角色创建
"/permissions/permissions/", # 权限创建
]
path = request.url.path
return any(
(pattern == path or (pattern.endswith('*') and path.startswith(pattern[:-1])))
for pattern in sensitive_paths
)
async def log_sensitive_operation(self, request: Request):
"""记录敏感操作日志"""
# 实现日志记录逻辑
pass
async def check_additional_security(self, request: Request) -> bool:
"""检查额外安全要求"""
# 实现额外安全检查
# 如:二次验证、IP白名单等
return True#权限最小化原则实现
class PrincipleOfLeastPrivilege:
"""最小权限原则实施"""
def __init__(self, perm_service: PermissionService):
self.perm_service = perm_service
async def validate_user_permissions(self, user_id: int, requested_permissions: List[str]) -> List[str]:
"""验证用户请求的权限是否合理"""
user_permissions = await self.perm_service.get_user_permissions(user_id)
# 检查是否有过度权限请求
excessive_perms = []
justified_perms = []
for perm in requested_permissions:
if perm in user_permissions:
# 检查权限是否与当前操作相关
if await self.is_permission_justified(user_id, perm):
justified_perms.append(perm)
else:
excessive_perms.append(perm)
else:
excessive_perms.append(perm)
if excessive_perms:
# 记录过度权限请求
await self.log_excessive_permission_request(user_id, excessive_perms)
return justified_perms
async def is_permission_justified(self, user_id: int, permission: str) -> bool:
"""检查权限请求是否合理"""
# 实现合理性检查逻辑
# 如:检查操作上下文、时间、频率等
return True
async def log_excessive_permission_request(self, user_id: int, permissions: List[str]):
"""记录过度权限请求"""
# 实现日志记录
pass
# 权限使用监控
class PermissionUsageMonitor:
"""权限使用监控器"""
def __init__(self):
self.usage_stats = {}
self.alert_thresholds = {
Permission.USER_DELETE: 10, # 1小时内删除用户超过10次
Permission.SYSTEM_CONFIG: 5, # 1小时内修改系统配置超过5次
}
async def record_permission_usage(self, user_id: int, permission: str, resource_id: int = None):
"""记录权限使用"""
from datetime import datetime
now = datetime.utcnow()
key = f"{user_id}:{permission}"
if key not in self.usage_stats:
self.usage_stats[key] = []
# 清理过期记录(保留最近1小时)
cutoff = now - timedelta(hours=1)
self.usage_stats[key] = [
record for record in self.usage_stats[key]
if record['timestamp'] > cutoff
]
self.usage_stats[key].append({
'timestamp': now,
'resource_id': resource_id,
'ip_address': None # 从请求中获取
})
# 检查是否超过阈值
if permission in self.alert_thresholds:
count = len(self.usage_stats[key])
if count > self.alert_thresholds[permission]:
await self.trigger_alert(user_id, permission, count)
async def trigger_alert(self, user_id: int, permission: str, count: int):
"""触发权限使用告警"""
# 实现告警逻辑
print(f"ALERT: User {user_id} used permission {permission} {count} times in the last hour")#与其他权限框架对比
| 特性 | FastAPI RBAC | Django Guardian | Flask-Principal | Casbin |
|---|---|---|---|---|
| 集成难度 | 中等 | 高(Django专属) | 中等 | 低 |
| 性能 | 高 | 中等 | 中等 | 高 |
| 灵活性 | 高 | 中等 | 高 | 极高 |
| 学习曲线 | 中等 | 低(对Django用户) | 中等 | 高 |
| 功能完整性 | 完整 | 完整 | 基础 | 完整 |
| 细粒度控制 | 支持 | 支持 | 支持 | 高度支持 |
| 跨框架支持 | 是 | 否 | 是 | 是 |
| ACM模型支持 | 部分 | 是 | 是 | 是 |
#权限框架选择建议
"""
选择权限框架的决策树:
1. 如果使用Django → Django Guardian
2. 如果需要ACM模型 → Casbin
3. 如果需要最大灵活性 → 自定义RBAC + 权限钩子
4. 如果需要快速实现 → Flask-Principal变体
5. 如果使用FastAPI → 本教程的RBAC实现
FastAPI RBAC优势:
- 与FastAPI深度集成
- 依赖注入友好
- 异步支持
- 类型安全
- 易于测试
- 灵活的扩展性
"""#总结
FastAPI中的RBAC权限控制系统提供了完整的权限管理解决方案:
- 权限建模:通过枚举定义清晰的权限结构
- 角色管理:灵活的角色-权限映射系统
- 依赖注入:优雅的权限检查机制
- 细粒度控制:支持资源级权限控制
- 数据库集成:完整的权限持久化方案
- 性能优化:缓存和预加载机制
- 安全实践:最小权限原则和安全监控
正确实施RBAC系统能够:
- 提高系统安全性
- 简化权限管理
- 满足合规要求
- 支持系统扩展
💡 关键要点:权限逻辑应集中在依赖函数中,避免散落在业务代码中。实施最小权限原则,定期审查权限分配,记录安全审计日志。
🔗 扩展阅读

