#基于角色的权限控制 (RBAC):如何保护你的敏感接口
📂 所属阶段:第四阶段 — 安全与认证(安全篇)
🔗 相关章节:OAuth2 与 JWT 鉴权 · 依赖注入系统
#1. RBAC 核心概念
#1.1 什么是 RBAC?
用户 → 角色 → 权限
↓
资源 + 操作
例子:
Alice(用户)→ 管理员(角色)→ 删除用户、修改配置(权限)
Bob(用户) → 编辑者(角色)→ 创建文章、编辑文章(权限)
Carol(用户)→ 普通用户(角色)→ 阅读文章、评论(权限)#1.2 权限模型设计
| 权限 | 资源 | 操作 |
|---|---|---|
user:read | 用户 | 读取 |
user:create | 用户 | 创建 |
user:update | 用户 | 修改 |
user:delete | 用户 | 删除 |
article:read | 文章 | 读取 |
article:write | 文章 | 读写 |
article:publish | 文章 | 发布 |
admin:access | 管理后台 | 访问 |
#2. 权限定义
#2.1 权限枚举
# models/permission.py
from enum import Enum
class Permission(str, Enum):
# 用户相关
USER_READ = "user:read"
USER_CREATE = "user:create"
USER_UPDATE = "user:update"
USER_DELETE = "user:delete"
# 文章相关
ARTICLE_READ = "article:read"
ARTICLE_WRITE = "article:write"
ARTICLE_PUBLISH = "article:publish"
ARTICLE_DELETE = "article:delete"
# 系统
ADMIN_ACCESS = "admin:access"
SYSTEM_CONFIG = "system:config"
class Role(str, Enum):
ADMIN = "admin"
EDITOR = "editor"
AUTHOR = "author"
USER = "user"
GUEST = "guest"#2.2 角色-权限映射
# 角色权限配置
ROLE_PERMISSIONS: dict[Role, set[Permission]] = {
Role.ADMIN: {
Permission.USER_READ, Permission.USER_CREATE,
Permission.USER_UPDATE, Permission.USER_DELETE,
Permission.ARTICLE_READ, Permission.ARTICLE_WRITE,
Permission.ARTICLE_PUBLISH, Permission.ARTICLE_DELETE,
Permission.ADMIN_ACCESS, Permission.SYSTEM_CONFIG,
},
Role.EDITOR: {
Permission.USER_READ,
Permission.ARTICLE_READ, Permission.ARTICLE_WRITE, Permission.ARTICLE_PUBLISH,
Permission.ARTICLE_DELETE, # 仅删除自己发布的
},
Role.AUTHOR: {
Permission.USER_READ,
Permission.ARTICLE_READ, Permission.ARTICLE_WRITE,
},
Role.USER: {
Permission.USER_READ,
Permission.ARTICLE_READ,
},
Role.GUEST: {
Permission.ARTICLE_READ,
},
}#3. 权限依赖注入
#3.1 单权限检查
# dependencies/auth.py
from fastapi import Depends, HTTPException, status
from models.permission import Permission, ROLE_PERMISSIONS, Role
from models.user import User
def require_permission(permission: Permission):
"""检查用户是否拥有指定权限"""
async def checker(current_user: User = Depends(get_current_user)) -> User:
# 从 JWT 获取用户角色
user_role = Role(current_user.role)
# 获取角色拥有的权限集合
user_permissions = ROLE_PERMISSIONS.get(user_role, set())
if permission not in user_permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"缺少权限: {permission.value}",
)
return current_user
return checker
# 使用
@app.delete("/users/{user_id}")
async def delete_user(
user_id: int,
_: User = Depends(require_permission(Permission.USER_DELETE)),
):
...#3.2 多权限检查(AND / OR)
def require_any_permission(*permissions: Permission):
"""拥有任一权限即可通过"""
async def checker(current_user: User = Depends(get_current_user)) -> User:
user_permissions = ROLE_PERMISSIONS.get(Role(current_user.role), set())
if not any(p in user_permissions for p in permissions):
raise HTTPException(403, f"需要以下任一权限: {[p.value for p in permissions]}")
return current_user
return checker
def require_all_permissions(*permissions: Permission):
"""必须拥有所有权限才通过"""
async def checker(current_user: User = Depends(get_current_user)) -> User:
user_permissions = ROLE_PERMISSIONS.get(Role(current_user.role), set())
missing = [p for p in permissions if p not in user_permissions]
if missing:
raise HTTPException(403, f"缺少以下权限: {[p.value for p in missing]}")
return current_user
return checker
# 使用
@app.get("/dashboard")
async def dashboard(
_: User = Depends(require_any_permission(
Permission.ADMIN_ACCESS, Permission.ARTICLE_PUBLISH
))
):
...#3.3 资源级权限检查
async def require_ownership(resource_owner_id: int, current_user: User = Depends(get_current_user)):
"""检查资源是否属于当前用户,或用户是管理员"""
if current_user.role == Role.ADMIN:
return current_user
if current_user.id != resource_owner_id:
raise HTTPException(403, "无权操作此资源")
return current_user
@app.delete("/articles/{article_id}")
async def delete_article(
article_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
article = await db.get_article(article_id)
# 资源级权限:文章作者或管理员可删除
if article.author_id != current_user.id and current_user.role != Role.ADMIN:
raise HTTPException(403, "无权删除此文章")
await db.delete(article)
return {"deleted": article_id}#4. 数据库级权限存储
#4.1 用户-角色关联模型
# models/user.py 扩展
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(unique=True)
hashed_password: Mapped[str] = mapped_column()
name: Mapped[str] = mapped_column()
role: Mapped[str] = mapped_column(String(20), default=Role.USER.value)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
# 多对多:用户自定义权限(扩展角色权限)
extra_permissions: Mapped[list["Permission"]] = relationship(
secondary=user_permissions, back_populates="users"
)
# 权限表
class PermissionModel(Base):
__tablename__ = "permissions"
id: Mapped[int] = mapped_column(primary_key=True)
code: Mapped[str] = mapped_column(unique=True)
name: Mapped[str] = mapped_column()
description: Mapped[str | None]
# 用户-权限关联表
user_permissions = Table(
"user_permissions", Base.metadata,
Column("user_id", Integer, ForeignKey("users.id"), primary_key=True),
Column("permission_id", Integer, ForeignKey("permissions.id"), primary_key=True),
)#4.2 获取用户完整权限
async def get_user_permissions(user: User) -> set[str]:
"""获取用户的所有权限 = 角色权限 + 个人额外权限"""
role_perms = {p.value for p in ROLE_PERMISSIONS.get(Role(user.role), set())}
extra_perms = {p.code for p in user.extra_permissions}
return role_perms | extra_perms#5. 实战:权限管理 API
# routers/admin.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from dependencies import get_db, require_permission
from models.permission import Permission
from models.user import User
router = APIRouter(prefix="/admin", tags=["管理后台"])
@router.get("/users/{user_id}/role")
async def get_user_role(
user_id: int,
_: User = Depends(require_permission(Permission.USER_READ)),
):
user = await db.get_user(user_id)
return {"id": user.id, "role": user.role}
@router.put("/users/{user_id}/role")
async def update_user_role(
user_id: int,
new_role: str,
current_user: User = Depends(require_permission(Permission.USER_UPDATE)),
db: AsyncSession = Depends(get_db),
):
# 防止越权:普通管理员不能把用户升为管理员
if new_role == Role.ADMIN.value and current_user.role != Role.ADMIN:
raise HTTPException(403, "无权授予管理员角色")
await db.update_user_role(user_id, new_role)
return {"message": "角色已更新"}#6. 小结
# RBAC 权限检查速查
# 1. 定义权限
class Permission(str, Enum):
USER_DELETE = "user:delete"
# 2. 角色-权限映射
ROLE_PERMISSIONS = {Role.ADMIN: {Permission.USER_DELETE, ...}}
# 3. 权限检查依赖
def require_permission(permission: Permission):
async def checker(current_user: User = Depends(get_current_user)):
if permission not in user_permissions:
raise HTTPException(403)
return current_user
return checker
# 4. 使用
@app.delete("/users/{id}")
async def delete_user(_: User = Depends(require_permission(Permission.USER_DELETE))):
...
# 5. 资源级检查
if article.author_id != current_user.id and current_user.role != Role.ADMIN:
raise HTTPException(403, "无权操作")💡 最佳实践:权限逻辑尽量放在依赖函数中,不要散落在各个路由里。这样修改权限逻辑只需改一处,且易于测试。
🔗 扩展阅读

