OAuth2 与 JWT 鉴权:实现无状态的 Token 认证机制

📂 所属阶段:第四阶段 — 安全与认证(安全篇)
🔗 相关章节:依赖注入系统 · 密码哈希与安全实践


1. JWT 基础

1.1 JWT 结构

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.
eyJzdWIiOiIxIiwiZW1haWwiOiJhbGljZUBleGFtcGxlLmNvbSIsInJvbGUiOiJhZG1pbiIsImV4cCI6MTc0MzEyNzIwMH0.
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
     ↓                ↓                ↓
  Header          Payload           Signature
  • Header:算法和类型 {"alg":"HS256","typ":"JWT"}
  • Payload:声明(用户信息、过期时间等)
  • Signature:签名,防止伪造

1.2 安装依赖

pip install python-jose[cryptography] passlib[bcrypt] bcrypt
# python-jose → JWT 编解码
# passlib → 密码哈希
# bcrypt → 推荐哈希算法

2. JWT 工具模块

2.1 JWT 配置与工具函数

# auth/jwt.py
from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import jwt, JWTError
from passlib.context import CryptContext
from config import get_settings

settings = get_settings()

# 密码哈希
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

ALGORITHM = "HS256"

# ── 密码操作 ──────────────────────────────────────
def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

# ── JWT 操作 ──────────────────────────────────────
def create_access_token(
    data: dict,
    expires_delta: timedelta | None = None,
) -> str:
    """创建访问令牌"""
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (
        expires_delta or timedelta(minutes=settings.jwt_expire_minutes)
    )
    to_encode.update({
        "exp": expire,
        "iat": datetime.now(timezone.utc),
        "jti": str(uuid.uuid4()),  # 令牌唯一 ID
    })
    return jwt.encode(to_encode, settings.jwt_secret, algorithm=ALGORITHM)

def create_refresh_token(user_id: int) -> str:
    """创建刷新令牌(有效期更长)"""
    return create_access_token(
        data={"sub": str(user_id), "type": "refresh"},
        expires_delta=timedelta(days=settings.jwt_refresh_expire_days or 30),
    )

def decode_token(token: str) -> dict:
    """解码并验证令牌"""
    try:
        payload = jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])
        return payload
    except JWTError as e:
        raise credentials_exception(f"Token 无效: {e}")

# ── 异常定义 ──────────────────────────────────────
class credentials_exception(HTTPException):
    def __init__(self, detail: str = "Could not validate credentials"):
        super().__init__(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=detail,
            headers={"WWW-Authenticate": "Bearer"},
        )

3. OAuth2 密码模式实现

3.1 Token Schema

# schemas/token.py
from pydantic import BaseModel

class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"

class TokenData(BaseModel):
    user_id: int | None = None
    email: str | None = None

class LoginRequest(BaseModel):
    email: str
    password: str

3.2 认证服务

# services/auth_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from models.user import User

class AuthService:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def authenticate_user(self, email: str, password: str) -> User | None:
        from sqlalchemy import select
        result = await self.db.execute(
            select(User).where(User.email == email)
        )
        user = result.scalar_one_or_none()
        if not user:
            return None
        if not verify_password(password, user.hashed_password):
            return None
        if not user.is_active:
            return None
        return user

    async def get_user_by_id(self, user_id: int) -> User | None:
        from sqlalchemy import select
        result = await self.db.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()

3.3 完整认证路由

# routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import timedelta

from dependencies import get_db
from services.auth_service import AuthService
from auth.jwt import (
    create_access_token, create_refresh_token,
    decode_token, credentials_exception, hash_password
)
from schemas.user import UserSchema, UserCreate
from schemas.token import Token
from config import get_settings

settings = get_settings()
router = APIRouter(prefix="/auth", tags=["认证"])

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")

# ── Token 获取(登录)─────────────────────────────
@router.post("/login", response_model=Token)
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db),
):
    """
    OAuth2 密码模式登录
    请求体:username(邮箱) + password
    返回:access_token + refresh_token
    """
    service = AuthService(db)
    user = await service.authenticate_user(form_data.username, form_data.password)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="邮箱或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )

    # 创建令牌
    access_token = create_access_token(
        data={
            "sub": str(user.id),
            "email": user.email,
            "role": user.role,
        }
    )
    refresh_token = create_refresh_token(user.id)

    return Token(access_token=access_token, refresh_token=refresh_token)

# ── Token 刷新 ────────────────────────────────────
@router.post("/refresh", response_model=Token)
async def refresh_token(refresh_token: str):
    payload = decode_token(refresh_token)
    if payload.get("type") != "refresh":
        raise credentials_exception("Refresh token 无效")

    user_id = int(payload["sub"])
    service = AuthService(db)

    # 创建新的 access_token(不暴露密码)
    access_token = create_access_token(
        data={"sub": str(user_id), "email": user.email, "role": user.role}
    )
    new_refresh = create_refresh_token(user_id)
    return Token(access_token=access_token, refresh_token=new_refresh)

3.4 获取当前用户依赖

# dependencies.py
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")

async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
) -> User:
    from auth.jwt import decode_token, credentials_exception
    from services.auth_service import AuthService

    payload = decode_token(token)
    user_id = int(payload.get("sub"))
    if user_id is None:
        raise credentials_exception()

    service = AuthService(db)
    user = await service.get_user_by_id(user_id)
    if user is None:
        raise credentials_exception()
    if not user.is_active:
        raise HTTPException(401, "用户已被禁用")
    return user

async def get_current_active_user(
    current_user: User = Depends(get_current_user),
) -> User:
    if not current_user.is_active:
        raise HTTPException(400, "Inactive user")
    return current_user

4. 受保护路由使用

from dependencies import get_current_active_user

@app.get("/profile")
async def get_my_profile(
    current_user: User = Depends(get_current_active_user)
):
    return current_user

@app.get("/admin/dashboard")
async def admin_dashboard(
    current_user: User = Depends(get_current_active_user)
):
    if current_user.role != "admin":
        raise HTTPException(403, "Admin access required")
    return {"secret": "admin data"}

# 公开路由不需要依赖
@app.get("/public/articles")
async def public_articles():
    return [...]

5. Token 黑名单(可选:登出功能)

# 简单的 Redis 黑名单
BLACKLIST_PREFIX = "token:blacklist:"

async def revoke_token(token: str):
    payload = decode_token(token)
    exp = payload.get("exp")
    if exp:
        ttl = exp - datetime.now(timezone.utc).timestamp()
        if ttl > 0:
            await redis_client.setex(f"{BLACKLIST_PREFIX}{token}", int(ttl), "1")

async def is_token_blacklisted(token: str) -> bool:
    return await redis_client.exists(f"{BLACKLIST_PREFIX}{token}") > 0

# 登出路由
@app.post("/auth/logout")
async def logout(token: str = Depends(oauth2_scheme)):
    await revoke_token(token)
    return {"message": "登出成功"}

6. 小结

# JWT 认证完整流程

# 1. 登录:用户提交邮箱+密码
POST /auth/login
  → AuthService.authenticate_user() 验证
create_access_token() 生成 JWT
  → 返回 {access_token, refresh_token}

# 2. 请求:携带 Token
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...

# 3. 验证:依赖注入获取用户
Depends(get_current_user)
oauth2_scheme() 提取 Token
decode_token() 验证签名和过期
  → 查询用户是否存在
  → 返回 User 对象给路由

# 4. 刷新:Token 过期后用 refresh_token 换新 access_token
POST /auth/refresh

💡 安全提示:JWT 密钥 jwt_secret 必须足够长(32字节+)且妥善保管在环境变量中,永不上传到代码仓库。生产环境使用 RS256 非对称算法更安全。


🔗 扩展阅读