#Redis 集成:缓存热点数据、处理 Session 与简单的消息队列
📂 所属阶段:第三阶段 — 数据持久化(数据库篇)
🔗 相关章节:SQLAlchemy 2.0 实战 · 异步编程深度解析
#1. Redis 快速入门
#1.1 为什么 Web 服务需要 Redis?
请求 → FastAPI → 数据库查询 → 返回
缓存后:
请求 → FastAPI → Redis(命中)→ 直接返回
请求 → FastAPI → Redis(未命中)→ 数据库 → 写入Redis → 返回Redis 作用:把频繁查询但不常变化的数据(如用户信息、文章列表)放在内存里,查询速度比数据库快 10-100 倍。
#1.2 安装与连接
pip install redis[hiredis]
# hiredis 是 C 语言实现的高性能解析器,推荐安装# redis_client.py
import redis.asyncio as redis
from config import get_settings
settings = get_settings()
# 异步 Redis 连接
redis_client = redis.from_url(
settings.redis_url, # "redis://localhost:6379/0"
encoding="utf-8",
decode_responses=True, # 自动将 bytes 解码为 str
)
# 测试连接
async def test_redis():
await redis_client.ping()
print("✅ Redis 连接成功")#2. 缓存实现
#2.1 通用缓存装饰器
# cache.py
import json
from functools import wraps
from redis.asyncio import Redis
def cached(
key_prefix: str,
expire: int = 300, # 默认 5 分钟过期
):
"""
异步缓存装饰器
用法:
@cached("user:{user_id}", expire=600)
async def get_user(user_id: int):
return await db.get_user(user_id)
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 构造缓存 key
cache_key = key_prefix.format(*args, **kwargs)
# 尝试从 Redis 获取
cached_value = await redis_client.get(cache_key)
if cached_value:
return json.loads(cached_value)
# 未命中,执行原函数
result = await func(*args, **kwargs)
# 存入 Redis
await redis_client.setex(
cache_key,
expire,
json.dumps(result, default=str)
)
return result
return wrapper
return decorator
async def invalidate_cache(pattern: str):
"""清除匹配的缓存键"""
keys = await redis_client.keys(pattern)
if keys:
await redis_client.delete(*keys)#2.2 用户信息缓存
from cache import cached, invalidate_cache
# 缓存用户信息,5 分钟内重复查询直接返回缓存
@cached("user:{user_id}", expire=300)
async def get_user_cached(user_id: int):
user = await db.get_user(user_id)
return {
"id": user.id,
"name": user.name,
"email": user.email,
"avatar": user.avatar,
}
# 用户更新后清除缓存
async def update_user(user_id: int, data: dict):
await db.update_user(user_id, data)
# 清除该用户的缓存
await invalidate_cache(f"user:{user_id}")#2.3 页面缓存(整页缓存)
@app.get("/posts/{slug}")
async def get_post(slug: str):
cache_key = f"post:page:{slug}"
# 尝试获取缓存
cached_html = await redis_client.get(cache_key)
if cached_html:
return HTMLResponse(content=cached_html)
# 生成页面
post = await get_post_data(slug)
html = render_post_template(post)
# 缓存 10 分钟
await redis_client.setex(cache_key, 600, html)
return HTMLResponse(content=html)#3. 分布式 Session
#3.1 基于 Redis 的 Session 管理
# session.py
import uuid
import json
from datetime import timedelta
from redis.asyncio import Redis
class RedisSession:
def __init__(self, redis: Redis):
self.redis = redis
self.prefix = "session:"
self.expire = 86400 * 7 # 7 天过期
async def create(self, user_id: int, extra_data: dict = None) -> str:
"""创建新 Session,返回 session_id"""
session_id = str(uuid.uuid4())
session_key = self.prefix + session_id
data = {"user_id": user_id, **(extra_data or {})}
await self.redis.setex(
session_key,
self.expire,
json.dumps(data)
)
return session_id
async def get(self, session_id: str) -> dict | None:
"""获取 Session 数据"""
session_key = self.prefix + session_id
data = await self.redis.get(session_key)
if data:
return json.loads(data)
return None
async def refresh(self, session_id: str) -> bool:
"""刷新 Session 过期时间"""
session_key = self.prefix + session_id
return await self.redis.expire(session_key, self.expire)
async def destroy(self, session_id: str) -> bool:
"""销毁 Session(登出)"""
session_key = self.prefix + session_id
return await self.redis.delete(session_key) > 0
session_manager = RedisSession(redis_client)#3.2 Session 认证依赖
from fastapi import Cookie, HTTPException
async def get_current_user_via_session(session_id: str = Cookie(None)):
if not session_id:
raise HTTPException(401, "请先登录")
session = await session_manager.get(session_id)
if not session:
raise HTTPException(401, "Session 已过期,请重新登录")
await session_manager.refresh(session_id) # 续期
return session
@app.get("/profile")
async def profile(user: dict = Depends(get_current_user_via_session)):
return {"user_id": user["user_id"], "data": user}#4. 简单的消息队列
#4.1 任务队列:List 实现
# queue.py
TASK_QUEUE = "fastapi:task_queue"
async def enqueue_task(task_data: dict):
"""入队:添加任务到队列尾部"""
import json
await redis_client.rpush(TASK_QUEUE, json.dumps(task_data))
async def dequeue_task(blocking: bool = True, timeout: int = 5):
"""出队:从队列头部取任务"""
import json
if blocking:
result = await redis_client.blpop(TASK_QUEUE, timeout=timeout)
if result:
_, task = result
return json.loads(task)
else:
task = await redis_client.lpop(TASK_QUEUE)
if task:
return json.loads(task)
return None
# 后台 worker
async def process_tasks():
while True:
task = await dequeue_task(blocking=True, timeout=0)
if task:
print(f"处理任务: {task}")
# await process_task(task)#4.2 延迟队列:ZSet 实现
DELAY_QUEUE = "fastapi:delay_queue"
DELAY_TEMP_QUEUE =
