FastAPI oauth2-jwt-auth完全指南

📂 所属阶段:第四阶段 — 安全与认证(安全篇)
🔗 相关章节:FastAPIdependency-injection · FastAPIpassword-hashing-security

当你构建 API 时,身份认证是不可或缺的一环。本教程将带你从零开始,用 FastAPI 搭配 OAuth2 密码模式和 JWT,构建一个清晰、可扩展的鉴权系统。我们会覆盖最基本的快速实现,再慢慢演进到模块化、包含刷新令牌和权限控制的完整方案。

目录

为什么选择JWT认证?

传统Session认证 vs JWT认证

在 Web 开发中,常见的认证方式有两种:传统的服务端 Session 和今天的主题 JWT。它们在设计哲学上截然不同,我们用一个表格来对比:

方案优势劣势适用场景
Session认证简单易用,服务器可以随时吊销登录状态有状态,扩展性差(多服务器需要共享 Session),跨域麻烦单体应用,小型项目
JWT认证无状态,天然适合分布式;不依赖服务器内存;跨域友好Token 体积较大,单方面失效较难分布式系统,微服务,前后端分离 API

简单来说,如果你的服务要部署成多个实例,或者前端是独立域名,JWT 会让你少踩很多坑。

JWT的核心价值

  1. 无状态 — 服务器不用记录谁登录了,每次请求自证身份。
  2. 可扩展 — 多个微服务实例可以独立验证同一个 Token,不用共享 Session。
  3. 跨域友好 — 前端可以随时把 Token 放在 Authorization 头里发给不同域的后端。
  4. 自包含 — Token 内嵌了用户基本信息(如用户 ID、角色),减少查库次数。
  5. 标准化 — 遵循 RFC 7519,生态工具丰富。

项目依赖安装

先装好我们需要的库:

pip install fastapi python-jose[cryptography] passlib[bcrypt] bcrypt python-multipart uvicorn
  • python-jose[cryptography]:生成和校验 JWT。
  • passlib[bcrypt] + bcrypt:安全的密码哈希。
  • python-multipart:解析 OAuth2 表单数据。

快速上手:核心实现

JWT结构简析

一个 JWT Token 看起来像一段乱码,实际上由三部分组成,用点号分隔:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.
eyJzdWIiOiIxIiwiZW1haWwiOiJhbGljZUBleGFtcGxlLmNvbSIsInJvbGUiOiJhZG1pbiIsImV4cCI6MTc0MzEyNzIwMH0.
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
     ↓                ↓                ↓
  Header          Payload           Signature
  • Header:包含算法类型 {"alg":"HS256","typ":"JWT"}
  • Payload:存放我们定义的声明,比如用户 ID、角色、过期时间等。注意:这部分只是 Base64 编码,不是加密,所以不能放敏感信息。
  • Signature:将 Header 和 Payload 用密钥签名,防止被篡改。只有拥有密钥才能生成合法的签名。

最小化完整示例

下面是一个可以立刻跑起来的完整代码,它暴露了登录接口和需要认证的个人信息接口。你可以先复制、运行,感受流程,再慢慢拆解。

# main.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta, timezone
from pydantic import BaseModel

# ---------- 配置 ----------
SECRET_KEY = "your-secret-key-change-this-in-production-at-least-32-chars"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# ---------- 模拟数据库 ----------
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "hashed_password": "$2b$12$EixZaYb4uX512Gpq5vWveu5G9.5G9.5G9.5G9.5G9.5G9.5G9.5G",
        "email": "johndoe@example.com",
        "role": "user"
    }
}

# ---------- 初始化 ----------
app = FastAPI()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# ---------- Pydantic 模型 ----------
class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: str | None = None

# ---------- 工具函数 ----------
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_user(db, username: str):
    if username in db:
        return db[username]

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user["hashed_password"]):
        return False
    return user

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: str = Depends(oauth2_scheme)):
    """依赖注入:从请求头中提取并校验 JWT,返回当前用户"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无法验证凭据",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

# ---------- 路由 ----------
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    """OAuth2 密码模式登录,返回 access_token"""
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user["username"], "role": user["role"]},
        expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: dict = Depends(get_current_user)):
    """返回当前用户信息(需要认证)"""
    return current_user

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

💡 运行测试
启动服务后,访问 http://127.0.0.1:8000/docs 可以在 Swagger UI 里直接测试:先执行 /token 获取 Token,再点击 /users/me 右侧的锁图标填入 Token,即可看到当前用户信息。

通过这个例子,你已经看到了 FastAPI 鉴权的核心套路:密码验证 → 签发 JWT → 依赖注入保护路由。下面我们把它拆分成更规范的模块结构,方便在实际项目中使用。

JWT工具模块

把所有 JWT 逻辑集中到一个管理器里,能减少重复代码,也方便将来更换算法或增加功能。

# auth/jwt.py
import uuid
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any
from jose import jwt, JWTError
from passlib.context import CryptContext
from fastapi import HTTPException, status

# ---------- 配置 ----------
SECRET_KEY = "your-secret-key-change-this-in-production-at-least-32-chars"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class JWTManager:
    """JWT管理器 - 集中处理 Token 和密码哈希"""

    def __init__(self):
        self.secret_key = SECRET_KEY
        self.algorithm = ALGORITHM
        self.access_token_expire = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        self.refresh_token_expire = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)

    # ---------- 密码工具 ----------
    def verify_password(self, plain_password: str, hashed_password: str) -> bool:
        return pwd_context.verify(plain_password, hashed_password)

    def hash_password(self, password: str) -> str:
        return pwd_context.hash(password)

    # ---------- Token 生成 ----------
    def create_access_token(
        self,
        data: Dict[str, Any],
        expires_delta: Optional[timedelta] = None
    ) -> str:
        """生成访问令牌,包含唯一 ID、类型等声明"""
        to_encode = data.copy()
        expire = datetime.now(timezone.utc) + (expires_delta or self.access_token_expire)
        to_encode.update({
            "exp": expire.timestamp(),
            "iat": datetime.now(timezone.utc).timestamp(),
            "jti": str(uuid.uuid4()),      # 唯一标识,可用于黑名单
            "type": "access",
        })
        return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)

    def create_refresh_token(self, user_id: int) -> str:
        """生成刷新令牌,有效期更长"""
        return self.create_access_token(
            data={"sub": str(user_id), "type": "refresh"},
            expires_delta=self.refresh_token_expire
        )

    # ---------- Token 校验 ----------
    def decode_token(self, token: str) -> Dict[str, Any]:
        """解码并验证令牌,异常统一处理"""
        try:
            return jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
        except JWTError as e:
            raise self._create_credentials_exception(f"Token无效: {str(e)}")

    def _create_credentials_exception(self, detail: str = "无法验证凭据") -> HTTPException:
        return HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=detail,
            headers={"WWW-Authenticate": "Bearer"},
        )

# 全局实例,方便各模块引用
jwt_manager = JWTManager()

这个模块提供了三个核心能力:

  • 密码哈希与校验:通过 verify_passwordhash_password
  • 生成不同用途的 Tokencreate_access_token 用于短时间的访问令牌,create_refresh_token 用于长期有效的刷新令牌。
  • 统一的解码入口decode_token,如果 Token 过期或签名不对会直接抛出 401。

认证服务与路由

有了 JWT 工具,我们再封装一个简单的认证服务,把用户验证逻辑从路由里分离出来。

认证服务层

# services/auth_service.py
from typing import Optional
from auth.jwt import jwt_manager

# 模拟数据库
fake_users_db = {
    1: {
        "id": 1,
        "username": "johndoe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYb4uX512Gpq5vWveu5G9.5G9.5G9.5G9.5G9.5G9.5G9.5G",
        "role": "user",
        "is_active": True,
    }
}

class AuthService:
    """处理用户认证相关业务逻辑"""

    def authenticate_user(self, username: str, password: str) -> Optional[dict]:
        """根据用户名密码验证用户"""
        user = next((u for u in fake_users_db.values() if u["username"] == username), None)
        if not user:
            return None
        if not jwt_manager.verify_password(password, user["hashed_password"]):
            return None
        if not user.get("is_active"):
            return None
        return user

    def get_user_by_id(self, user_id: int) -> Optional[dict]:
        return fake_users_db.get(user_id)

认证路由

现在编写专门的认证路由,暴露登录接口,并同时返回访问令牌和刷新令牌。

# routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import timedelta

from auth.jwt import jwt_manager
from services.auth_service import AuthService

router = APIRouter(prefix="/auth", tags=["认证"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
auth_service = AuthService()

class TokenResponse(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"
    expires_in: int

@router.post("/login", response_model=TokenResponse)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    """OAuth2 密码模式登录,返回 access + refresh token"""
    user = auth_service.authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token = jwt_manager.create_access_token(
        data={"sub": str(user["id"]), "username": user["username"], "role": user["role"]},
    )
    refresh_token = jwt_manager.create_refresh_token(user["id"])

    return TokenResponse(
        access_token=access_token,
        refresh_token=refresh_token,
        token_type="bearer",
        expires_in=jwt_manager.access_token_expire.seconds,
    )

🧠 设计思路

  • 访问令牌有效期短,降低泄露风险。
  • 刷新令牌有效期长,前台可以静默刷新,用户无需频繁登录。
  • 刷新令牌的 type 字段可以用来区分用途,后面实现刷新接口时能进行校验。

依赖注入与受保护路由

FastAPI 的依赖注入是认证中最优雅的部分。我们编写几个不同粒度的依赖项,轻松实现“普通用户”和“管理员”的权限控制。

依赖注入实现

# dependencies.py
from fastapi import Depends, HTTPException, status
from auth.jwt import jwt_manager
from services.auth_service import AuthService
from routers.auth import oauth2_scheme

auth_service = AuthService()

async def get_current_user(token: str = Depends(oauth2_scheme)):
    """从 Token 中提取当前用户(基础依赖)"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无法验证凭据",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt_manager.decode_token(token)
        user_id: int = int(payload.get("sub"))
        if user_id is None:
            raise credentials_exception
    except Exception:
        raise credentials_exception

    user = auth_service.get_user_by_id(user_id)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(current_user: dict = Depends(get_current_user)):
    """进一步检查用户是否为活跃状态"""
    if not current_user.get("is_active"):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="用户账户未激活"
        )
    return current_user

async def get_current_admin_user(current_user: dict = Depends(get_current_user)):
    """要求管理员角色"""
    if current_user.get("role") != "admin":
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="权限不足:需要管理员权限"
        )
    return current_user

这几个依赖项像搭积木一样,一层层叠加验证逻辑,路由函数只需要声明需要的依赖即可。

受保护路由

# routers/protected.py
from fastapi import APIRouter, Depends
from dependencies import get_current_active_user, get_current_admin_user

router = APIRouter(prefix="/protected", tags=["受保护路由"])

@router.get("/profile")
async def get_profile(current_user: dict = Depends(get_current_active_user)):
    """查看个人资料 - 需要已激活的登录用户"""
    # 过滤掉密码哈希等敏感字段
    return {k: v for k, v in current_user.items() if k != "hashed_password"}

@router.get("/admin/dashboard")
async def admin_dashboard(current_user: dict = Depends(get_current_admin_user)):
    """管理员面板 - 只有 admin 角色可以访问"""
    return {
        "message": "管理员仪表板",
        "user": current_user["username"],
        "role": current_user["role"],
    }

现在你的路由可以自由组合不同的权限依赖,代码可读性和安全性都大幅提升。

安全最佳实践

功能跑通只是第一步,生产环境必须加上这些“防护盾”。

核心安全建议

  1. 全程 HTTPS — 绝不在 HTTP 链路上传递 Token,否则等于透明。
  2. 强密钥+轮换 — JWT 密钥至少 32 字节,定期更换,旧密钥可保留一段时间以便迁移。
  3. 短访问令牌 + 长刷新令牌 — 访问令牌 15~30 分钟过期,泄露后影响窗口小;用刷新令牌静默换取新访问令牌。
  4. 令牌黑名单 — 当用户登出或修改密码时,将当前 Token 的 jti 加入 Redis 黑名单,网关层统一拦截。
  5. 密码安全 — 永远只存 bcrypt 哈希,不要记录明文,密码强度要求(长度、字符类型)在注册端校验。
  6. 速率限制 — 登录接口必须限制频率(例如 5次/分钟),防止暴力破解。
  7. 安全响应头 — 设置 X-Content-Type-Options: nosniffX-Frame-Options: DENY 等,增强整体防护。

企业级安全配置示例

将安全相关的配置集中管理,方便通过环境变量覆盖。

# config/security.py
import os

class SecurityConfig:
    """安全配置(通过环境变量覆盖默认值)"""

    # JWT
    JWT_ALGORITHM: str = os.getenv("JWT_ALGORITHM", "HS256")
    JWT_SECRET_KEY: str = os.getenv("JWT_SECRET_KEY", "")

    # 密码策略
    PASSWORD_MIN_LENGTH: int = 12
    PASSWORD_REQUIRE_UPPERCASE: bool = True
    PASSWORD_REQUIRE_LOWERCASE: bool = True
    PASSWORD_REQUIRE_NUMBERS: bool = True
    PASSWORD_REQUIRE_SYMBOLS: bool = True

    # 速率限制
    LOGIN_RATE_LIMIT: str = "5/minute"
    API_RATE_LIMIT: str = "100/minute"

    # 会话与过期时间
    MAX_ACTIVE_SESSIONS: int = 5
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 15
    REFRESH_TOKEN_EXPIRE_DAYS: int = 7

security_config = SecurityConfig()

⚠️ 生产环境千万不要把密钥写在代码里,务必通过环境变量注入,而且建议使用非对称算法(如 RS256)来进一步提升安全性。

总结

通过本指南,我们构建了一个从简单到完善、可直接落地的 FastAPI 鉴权系统:

  1. 无状态设计 — JWT 自包含用户信息,完美适配多实例部署。
  2. 精细的权限控制 — 依赖注入让普通用户、活跃用户、管理员等权限划分一目了然。
  3. 安全的令牌管理 — 支持短时访问令牌、长时刷新令牌以及黑名单机制。
  4. 清晰的模块化架构 — JWT 管理器、认证服务、依赖项各自独立,易于测试和扩展。

💡 记住三个关键点:用 HTTPS 传输令牌;保持短有效期+刷新机制;做好速率限制和日志审计。这些是生产环境安全的基本盘。


🔗 扩展阅读