Redis 集成:缓存热点数据、处理 Session 与简单的消息队列

📂 所属阶段:第三阶段 — 数据持久化(数据库篇)
🔗 相关章节:SQLAlchemy 2.0 实战 · 异步编程深度解析


1. Redis 快速入门

1.1 为什么 Web 服务需要 Redis?

请求 → FastAPI → 数据库查询 → 返回

缓存后:
请求 → FastAPI → Redis(命中)→ 直接返回
请求 → FastAPI → Redis(未命中)→ 数据库 → 写入Redis → 返回

Redis 作用:把频繁查询但不常变化的数据(如用户信息、文章列表)放在内存里,查询速度比数据库快 10-100 倍。

1.2 安装与连接

pip install redis[hiredis]
# hiredis 是 C 语言实现的高性能解析器,推荐安装
# redis_client.py
import redis.asyncio as redis
from config import get_settings

settings = get_settings()

# 异步 Redis 连接
redis_client = redis.from_url(
    settings.redis_url,  # "redis://localhost:6379/0"
    encoding="utf-8",
    decode_responses=True,   # 自动将 bytes 解码为 str
)

# 测试连接
async def test_redis():
    await redis_client.ping()
    print("✅ Redis 连接成功")

2. 缓存实现

2.1 通用缓存装饰器

# cache.py
import json
from functools import wraps
from redis.asyncio import Redis

def cached(
    key_prefix: str,
    expire: int = 300,  # 默认 5 分钟过期
):
    """
    异步缓存装饰器
    用法:
    @cached("user:{user_id}", expire=600)
    async def get_user(user_id: int):
        return await db.get_user(user_id)
    """
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 构造缓存 key
            cache_key = key_prefix.format(*args, **kwargs)

            # 尝试从 Redis 获取
            cached_value = await redis_client.get(cache_key)
            if cached_value:
                return json.loads(cached_value)

            # 未命中,执行原函数
            result = await func(*args, **kwargs)

            # 存入 Redis
            await redis_client.setex(
                cache_key,
                expire,
                json.dumps(result, default=str)
            )
            return result
        return wrapper
    return decorator


async def invalidate_cache(pattern: str):
    """清除匹配的缓存键"""
    keys = await redis_client.keys(pattern)
    if keys:
        await redis_client.delete(*keys)

2.2 用户信息缓存

from cache import cached, invalidate_cache

# 缓存用户信息,5 分钟内重复查询直接返回缓存
@cached("user:{user_id}", expire=300)
async def get_user_cached(user_id: int):
    user = await db.get_user(user_id)
    return {
        "id": user.id,
        "name": user.name,
        "email": user.email,
        "avatar": user.avatar,
    }

# 用户更新后清除缓存
async def update_user(user_id: int, data: dict):
    await db.update_user(user_id, data)
    # 清除该用户的缓存
    await invalidate_cache(f"user:{user_id}")

2.3 页面缓存(整页缓存)

@app.get("/posts/{slug}")
async def get_post(slug: str):
    cache_key = f"post:page:{slug}"

    # 尝试获取缓存
    cached_html = await redis_client.get(cache_key)
    if cached_html:
        return HTMLResponse(content=cached_html)

    # 生成页面
    post = await get_post_data(slug)
    html = render_post_template(post)

    # 缓存 10 分钟
    await redis_client.setex(cache_key, 600, html)
    return HTMLResponse(content=html)

3. 分布式 Session

3.1 基于 Redis 的 Session 管理

# session.py
import uuid
import json
from datetime import timedelta
from redis.asyncio import Redis

class RedisSession:
    def __init__(self, redis: Redis):
        self.redis = redis
        self.prefix = "session:"
        self.expire = 86400 * 7  # 7 天过期

    async def create(self, user_id: int, extra_data: dict = None) -> str:
        """创建新 Session,返回 session_id"""
        session_id = str(uuid.uuid4())
        session_key = self.prefix + session_id
        data = {"user_id": user_id, **(extra_data or {})}
        await self.redis.setex(
            session_key,
            self.expire,
            json.dumps(data)
        )
        return session_id

    async def get(self, session_id: str) -> dict | None:
        """获取 Session 数据"""
        session_key = self.prefix + session_id
        data = await self.redis.get(session_key)
        if data:
            return json.loads(data)
        return None

    async def refresh(self, session_id: str) -> bool:
        """刷新 Session 过期时间"""
        session_key = self.prefix + session_id
        return await self.redis.expire(session_key, self.expire)

    async def destroy(self, session_id: str) -> bool:
        """销毁 Session(登出)"""
        session_key = self.prefix + session_id
        return await self.redis.delete(session_key) > 0

session_manager = RedisSession(redis_client)

3.2 Session 认证依赖

from fastapi import Cookie, HTTPException

async def get_current_user_via_session(session_id: str = Cookie(None)):
    if not session_id:
        raise HTTPException(401, "请先登录")
    session = await session_manager.get(session_id)
    if not session:
        raise HTTPException(401, "Session 已过期,请重新登录")
    await session_manager.refresh(session_id)  # 续期
    return session

@app.get("/profile")
async def profile(user: dict = Depends(get_current_user_via_session)):
    return {"user_id": user["user_id"], "data": user}

4. 简单的消息队列

4.1 任务队列:List 实现

# queue.py
TASK_QUEUE = "fastapi:task_queue"

async def enqueue_task(task_data: dict):
    """入队:添加任务到队列尾部"""
    import json
    await redis_client.rpush(TASK_QUEUE, json.dumps(task_data))

async def dequeue_task(blocking: bool = True, timeout: int = 5):
    """出队:从队列头部取任务"""
    import json
    if blocking:
        result = await redis_client.blpop(TASK_QUEUE, timeout=timeout)
        if result:
            _, task = result
            return json.loads(task)
    else:
        task = await redis_client.lpop(TASK_QUEUE)
        if task:
            return json.loads(task)
    return None

# 后台 worker
async def process_tasks():
    while True:
        task = await dequeue_task(blocking=True, timeout=0)
        if task:
            print(f"处理任务: {task}")
            # await process_task(task)

4.2 延迟队列:ZSet 实现

DELAY_QUEUE = "fastapi:delay_queue"
DELAY_TEMP_QUEUE =