#OAuth2 与 JWT 鉴权:实现无状态的 Token 认证机制
#1. JWT 基础
#1.1 JWT 结构
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.
eyJzdWIiOiIxIiwiZW1haWwiOiJhbGljZUBleGFtcGxlLmNvbSIsInJvbGUiOiJhZG1pbiIsImV4cCI6MTc0MzEyNzIwMH0.
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
↓ ↓ ↓
Header Payload Signature- Header:算法和类型
{"alg":"HS256","typ":"JWT"} - Payload:声明(用户信息、过期时间等)
- Signature:签名,防止伪造
#1.2 安装依赖
pip install python-jose[cryptography] passlib[bcrypt] bcrypt
# python-jose → JWT 编解码
# passlib → 密码哈希
# bcrypt → 推荐哈希算法#2. JWT 工具模块
#2.1 JWT 配置与工具函数
# auth/jwt.py
from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import jwt, JWTError
from passlib.context import CryptContext
from config import get_settings
settings = get_settings()
# 密码哈希
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ALGORITHM = "HS256"
# ── 密码操作 ──────────────────────────────────────
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def hash_password(password: str) -> str:
return pwd_context.hash(password)
# ── JWT 操作 ──────────────────────────────────────
def create_access_token(
data: dict,
expires_delta: timedelta | None = None,
) -> str:
"""创建访问令牌"""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=settings.jwt_expire_minutes)
)
to_encode.update({
"exp": expire,
"iat": datetime.now(timezone.utc),
"jti": str(uuid.uuid4()), # 令牌唯一 ID
})
return jwt.encode(to_encode, settings.jwt_secret, algorithm=ALGORITHM)
def create_refresh_token(user_id: int) -> str:
"""创建刷新令牌(有效期更长)"""
return create_access_token(
data={"sub": str(user_id), "type": "refresh"},
expires_delta=timedelta(days=settings.jwt_refresh_expire_days or 30),
)
def decode_token(token: str) -> dict:
"""解码并验证令牌"""
try:
payload = jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])
return payload
except JWTError as e:
raise credentials_exception(f"Token 无效: {e}")
# ── 异常定义 ──────────────────────────────────────
class credentials_exception(HTTPException):
def __init__(self, detail: str = "Could not validate credentials"):
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=detail,
headers={"WWW-Authenticate": "Bearer"},
)#3. OAuth2 密码模式实现
#3.1 Token Schema
# schemas/token.py
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
class TokenData(BaseModel):
user_id: int | None = None
email: str | None = None
class LoginRequest(BaseModel):
email: str
password: str#3.2 认证服务
# services/auth_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from models.user import User
class AuthService:
def __init__(self, db: AsyncSession):
self.db = db
async def authenticate_user(self, email: str, password: str) -> User | None:
from sqlalchemy import select
result = await self.db.execute(
select(User).where(User.email == email)
)
user = result.scalar_one_or_none()
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
if not user.is_active:
return None
return user
async def get_user_by_id(self, user_id: int) -> User | None:
from sqlalchemy import select
result = await self.db.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()#3.3 完整认证路由
# routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import timedelta
from dependencies import get_db
from services.auth_service import AuthService
from auth.jwt import (
create_access_token, create_refresh_token,
decode_token, credentials_exception, hash_password
)
from schemas.user import UserSchema, UserCreate
from schemas.token import Token
from config import get_settings
settings = get_settings()
router = APIRouter(prefix="/auth", tags=["认证"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
# ── Token 获取(登录)─────────────────────────────
@router.post("/login", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db),
):
"""
OAuth2 密码模式登录
请求体:username(邮箱) + password
返回:access_token + refresh_token
"""
service = AuthService(db)
user = await service.authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="邮箱或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
# 创建令牌
access_token = create_access_token(
data={
"sub": str(user.id),
"email": user.email,
"role": user.role,
}
)
refresh_token = create_refresh_token(user.id)
return Token(access_token=access_token, refresh_token=refresh_token)
# ── Token 刷新 ────────────────────────────────────
@router.post("/refresh", response_model=Token)
async def refresh_token(refresh_token: str):
payload = decode_token(refresh_token)
if payload.get("type") != "refresh":
raise credentials_exception("Refresh token 无效")
user_id = int(payload["sub"])
service = AuthService(db)
# 创建新的 access_token(不暴露密码)
access_token = create_access_token(
data={"sub": str(user_id), "email": user.email, "role": user.role}
)
new_refresh = create_refresh_token(user_id)
return Token(access_token=access_token, refresh_token=new_refresh)#3.4 获取当前用户依赖
# dependencies.py
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
from auth.jwt import decode_token, credentials_exception
from services.auth_service import AuthService
payload = decode_token(token)
user_id = int(payload.get("sub"))
if user_id is None:
raise credentials_exception()
service = AuthService(db)
user = await service.get_user_by_id(user_id)
if user is None:
raise credentials_exception()
if not user.is_active:
raise HTTPException(401, "用户已被禁用")
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
if not current_user.is_active:
raise HTTPException(400, "Inactive user")
return current_user#4. 受保护路由使用
from dependencies import get_current_active_user
@app.get("/profile")
async def get_my_profile(
current_user: User = Depends(get_current_active_user)
):
return current_user
@app.get("/admin/dashboard")
async def admin_dashboard(
current_user: User = Depends(get_current_active_user)
):
if current_user.role != "admin":
raise HTTPException(403, "Admin access required")
return {"secret": "admin data"}
# 公开路由不需要依赖
@app.get("/public/articles")
async def public_articles():
return [...]#5. Token 黑名单(可选:登出功能)
# 简单的 Redis 黑名单
BLACKLIST_PREFIX = "token:blacklist:"
async def revoke_token(token: str):
payload = decode_token(token)
exp = payload.get("exp")
if exp:
ttl = exp - datetime.now(timezone.utc).timestamp()
if ttl > 0:
await redis_client.setex(f"{BLACKLIST_PREFIX}{token}", int(ttl), "1")
async def is_token_blacklisted(token: str) -> bool:
return await redis_client.exists(f"{BLACKLIST_PREFIX}{token}") > 0
# 登出路由
@app.post("/auth/logout")
async def logout(token: str = Depends(oauth2_scheme)):
await revoke_token(token)
return {"message": "登出成功"}#6. 小结
# JWT 认证完整流程
# 1. 登录:用户提交邮箱+密码
POST /auth/login
→ AuthService.authenticate_user() 验证
→ create_access_token() 生成 JWT
→ 返回 {access_token, refresh_token}
# 2. 请求:携带 Token
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
# 3. 验证:依赖注入获取用户
Depends(get_current_user)
→ oauth2_scheme() 提取 Token
→ decode_token() 验证签名和过期
→ 查询用户是否存在
→ 返回 User 对象给路由
# 4. 刷新:Token 过期后用 refresh_token 换新 access_token
POST /auth/refresh💡 安全提示:JWT 密钥
jwt_secret必须足够长(32字节+)且妥善保管在环境变量中,永不上传到代码仓库。生产环境使用 RS256 非对称算法更安全。
🔗 扩展阅读

