Django缓存策略 - 高性能应用优化指南

📂 所属阶段:第二部分 — 进阶特性
🎯 难度等级:中级
⏰ 预计学习时间:4-5小时

目录

缓存基础概念

缓存是提高Web应用性能的关键技术,通过存储计算结果或数据副本,避免重复计算和数据库查询。

缓存原理

"""
缓存工作原理:

1. 请求到达时,首先检查缓存中是否存在所需数据
2. 如果存在(缓存命中),直接返回缓存数据
3. 如果不存在(缓存未命中),执行计算/查询操作
4. 将结果存储到缓存中,下次请求可直接使用
5. 缓存数据有过期时间,过期后重新生成

缓存优势:
- 减少数据库负载
- 降低响应时间
- 提高系统吞吐量
- 改善用户体验

缓存挑战:
- 数据一致性问题
- 缓存雪崩、穿透、击穿
- 内存使用管理
- 缓存策略选择
"""

缓存层级

"""
Django缓存层级:

1. 数据库查询缓存
   - QuerySet缓存
   - select_related/prefetch_related优化

2. 模板片段缓存
   - 模板标签缓存
   - 片段缓存

3. 视图级别缓存
   - 视图函数缓存
   - 类视图缓存

4. 页面级别缓存
   - 完整页面缓存
   - 中间件缓存

5. 应用层缓存
   - 自定义缓存逻辑
   - 业务逻辑缓存

6. 代理层缓存
   - Nginx缓存
   - CDN缓存
"""

缓存策略类型

"""
常见的缓存策略:

1. Cache-Aside Pattern (旁路缓存)
   - 应用负责从数据库加载数据到缓存
   - 缓存未命中时,应用从数据库获取数据并存入缓存

2. Read-Through Pattern (读穿透)
   - 缓存层透明地从数据库加载数据
   - 应用只需与缓存交互

3. Write-Through Pattern (写穿透)
   - 数据写入时同时写入缓存和数据库
   - 保证数据一致性

4. Write-Behind Pattern (写回)
   - 数据先写入缓存,后台异步写入数据库
   - 提高性能,但可能丢失数据
"""

Django缓存架构

缓存后端

# Django内置缓存后端
from django.core.cache import cache
from django.core.cache.backends.base import BaseCache

"""
Django支持的缓存后端:

1. DummyCache - 空缓存(用于开发环境)
2. LocMemCache - 本地内存缓存
3. FileBasedCache - 文件系统缓存
4. DatabaseCache - 数据库缓存
5. MemcachedCache - Memcached缓存
6. PyLibMCCache - PyLibMC缓存
7. RedisCache - Redis缓存(需要第三方包)

选择原则:
- 开发环境:LocMemCache
- 小型应用:LocMemCache或FileBasedCache
- 生产环境:Redis或Memcached
- 分布式应用:Redis或Memcached
"""

缓存API

# Django缓存API使用
from django.core.cache import cache
import time

# 基本操作
def cache_operations_demo():
    """缓存操作演示"""
    
    # 设置缓存
    cache.set('key', 'value', timeout=300)  # 5分钟后过期
    
    # 获取缓存
    value = cache.get('key')
    if value is None:
        # 缓存未命中,从数据库获取
        value = expensive_database_query()
        cache.set('key', value, timeout=300)
    
    # 获取缓存或设置默认值
    value = cache.get_or_set('key2', lambda: expensive_function(), timeout=60)
    
    # 批量操作
    cache.set_many({'a': 1, 'b': 2, 'c': 3}, timeout=300)
    values = cache.get_many(['a', 'b', 'c'])
    
    # 删除操作
    cache.delete('key')
    cache.delete_many(['a', 'b', 'c'])
    cache.clear()  # 清空所有缓存
    
    return value

def expensive_database_query():
    """模拟昂贵的数据库查询"""
    time.sleep(1)  # 模拟耗时操作
    return "expensive result"

def expensive_function():
    """模拟昂贵的函数调用"""
    time.sleep(0.5)
    return "expensive function result"

# 缓存键生成
def generate_cache_key(*args, **kwargs):
    """生成缓存键"""
    import hashlib
    import json
    
    # 将参数转换为字符串
    key_data = {
        'args': args,
        'kwargs': kwargs
    }
    
    # 生成哈希
    key_str = json.dumps(key_data, sort_keys=True)
    hash_key = hashlib.md5(key_str.encode()).hexdigest()
    
    return f"computed:{hash_key}"

# 带参数的缓存函数
def cached_function_with_params(user_id, category):
    """带参数的缓存函数"""
    cache_key = f"user_profile:{user_id}:{category}"
    result = cache.get(cache_key)
    
    if result is None:
        # 从数据库获取数据
        result = get_user_data(user_id, category)
        cache.set(cache_key, result, timeout=3600)  # 1小时
    
    return result

def get_user_data(user_id, category):
    """获取用户数据"""
    # 模拟数据库查询
    time.sleep(0.1)
    return {"user_id": user_id, "category": category, "data": "user data"}

缓存装饰器

# 缓存装饰器
from django.core.cache import cache
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
from django.views.generic import ListView
import functools

def cached_function(timeout=300):
    """函数缓存装饰器"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 生成缓存键
            cache_key = f"func:{func.__name__}:{hash(str(args) + str(kwargs))}"
            
            # 尝试从缓存获取
            result = cache.get(cache_key)
            if result is None:
                # 执行函数并缓存结果
                result = func(*args, **kwargs)
                cache.set(cache_key, result, timeout=timeout)
            
            return result
        return wrapper
    return decorator

# 使用示例
@cached_function(timeout=600)
def calculate_expensive_operation(x, y):
    """昂贵的计算操作"""
    time.sleep(2)  # 模拟耗时计算
    return x * y + x ** 2 + y ** 2

# 类方法缓存装饰器
def cached_method(timeout=300):
    """方法缓存装饰器"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):
            # 包含实例信息的缓存键
            cache_key = f"method:{self.__class__.__name__}:{func.__name__}:{id(self)}:{hash(str(args) + str(kwargs))}"
            
            result = cache.get(cache_key)
            if result is None:
                result = func(self, *args, **kwargs)
                cache.set(cache_key, result, timeout=timeout)
            
            return result
        return wrapper
    return decorator

class DataProcessor:
    """数据处理器"""
    
    def __init__(self, processor_id):
        self.processor_id = processor_id
    
    @cached_method(timeout=1800)
    def process_data(self, data):
        """处理数据"""
        time.sleep(1)  # 模拟处理时间
        return f"Processed {len(data)} items for processor {self.processor_id}"

# 高级缓存装饰器
def smart_cache(timeout=300, key_prefix='', version=1):
    """智能缓存装饰器"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 生成更智能的缓存键
            key_parts = [key_prefix, func.__module__, func.__name__]
            
            # 添加参数到键中
            if args:
                key_parts.extend([str(arg) for arg in args])
            if kwargs:
                # 按键排序确保一致性
                sorted_kwargs = sorted(kwargs.items())
                key_parts.append(str(sorted_kwargs))
            
            cache_key = ":".join(key_parts)
            cache_key = f"{cache_key}:v{version}"
            
            # 尝试获取缓存
            result = cache.get(cache_key)
            if result is None:
                result = func(*args, **kwargs)
                cache.set(cache_key, result, timeout=timeout)
            
            return result
        return wrapper
    return decorator

# 使用示例
@smart_cache(timeout=3600, key_prefix='stats', version=2)
def get_user_statistics(user_id, period='daily'):
    """获取用户统计数据"""
    # 模拟复杂的数据聚合
    time.sleep(1)
    return {
        'user_id': user_id,
        'period': period,
        'views': 100,
        'actions': 50
    }

缓存后端配置

settings.py 配置

# settings.py - 缓存配置
"""
缓存配置示例
"""
import os

# 基础缓存配置
CACHES = {
    # 默认缓存
    'default': {
        'BACKEND': 'django.core.cache.backends.redis.RedisCache',
        'LOCATION': [
            'redis://127.0.0.1:6379/1',
            'redis://127.0.0.1:6380/1',  # 支持多个Redis实例
        ],
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
            'CONNECTION_POOL_KWARGS': {
                'max_connections': 20,
                'retry_on_timeout': True,
            },
            'COMPRESSOR': 'django_redis.compressors.zlib.ZlibCompressor',
            'SERIALIZER': 'django_redis.serializers.json.JSONSerializer',
        },
        'KEY_PREFIX': 'myapp',
        'TIMEOUT': 300,  # 5分钟默认超时
        'VERSION': 1,
    },
    
    # 会话缓存
    'sessions': {
        'BACKEND': 'django.core.cache.backends.redis.RedisCache',
        'LOCATION': 'redis://127.0.0.1:6379/2',
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
        },
        'KEY_PREFIX': 'session',
        'TIMEOUT': 1209600,  # 2周
    },
    
    # 临时缓存
    'temporary': {
        'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
        'LOCATION': 'temp-cache',
        'OPTIONS': {
            'MAX_ENTRIES': 1000,
        },
        'TIMEOUT': 60,  # 1分钟
    },
}

# 开发环境配置
if os.environ.get('DJANGO_ENV') == 'development':
    CACHES = {
        'default': {
            'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
            'LOCATION': 'unique-snowflake',
            'OPTIONS': {
                'MAX_ENTRIES': 1000,
            }
        }
    }

# 生产环境配置
if os.environ.get('DJANGO_ENV') == 'production':
    CACHES = {
        'default': {
            'BACKEND': 'django.core.cache.backends.redis.RedisCache',
            'LOCATION': os.environ.get('REDIS_URL', 'redis://127.0.0.1:6379/1'),
            'OPTIONS': {
                'CLIENT_CLASS': 'django_redis.client.DefaultClient',
                'CONNECTION_POOL_KWARGS': {
                    'max_connections': 50,
                    'retry_on_timeout': True,
                    'socket_keepalive': True,
                    'socket_keepalive_options': {True},
                },
            },
            'KEY_PREFIX': os.environ.get('CACHE_KEY_PREFIX', 'prod'),
            'TIMEOUT': 300,
        }
    }

# 缓存相关设置
CACHE_MIDDLEWARE_ALIAS = 'default'  # 中间件使用的缓存别名
CACHE_MIDDLEWARE_SECONDS = 600     # 页面缓存时间
CACHE_MIDDLEWARE_KEY_PREFIX = ''   # 页面缓存键前缀

# 会话使用缓存
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'sessions'

Redis缓存配置

# Redis缓存高级配置
"""
pip install django-redis redis hiredis
"""
import redis
from django.conf import settings

class RedisCacheManager:
    """Redis缓存管理器"""
    
    def __init__(self):
        self.redis_client = redis.Redis(
            host=getattr(settings, 'REDIS_HOST', 'localhost'),
            port=getattr(settings, 'REDIS_PORT', 6379),
            db=getattr(settings, 'REDIS_DB', 0),
            password=getattr(settings, 'REDIS_PASSWORD', None),
            decode_responses=False,  # 保持字节模式
            socket_connect_timeout=5,
            socket_timeout=5,
            retry_on_timeout=True,
        )
    
    def get_connection_pool(self):
        """获取连接池"""
        pool = redis.ConnectionPool(
            host=getattr(settings, 'REDIS_HOST', 'localhost'),
            port=getattr(settings, 'REDIS_PORT', 6379),
            db=getattr(settings, 'REDIS_DB', 0),
            password=getattr(settings, 'REDIS_PASSWORD', None),
            max_connections=getattr(settings, 'REDIS_MAX_CONNECTIONS', 20),
            retry_on_timeout=True,
        )
        return redis.Redis(connection_pool=pool)
    
    def get_pipeline(self):
        """获取管道操作"""
        return self.redis_client.pipeline()
    
    def batch_set(self, data_dict, expire_time=300):
        """批量设置"""
        pipe = self.get_pipeline()
        for key, value in data_dict.items():
            pipe.setex(key, expire_time, value)
        pipe.execute()
    
    def batch_get(self, keys):
        """批量获取"""
        return self.redis_client.mget(keys)
    
    def atomic_increment(self, key, amount=1, expire_time=300):
        """原子递增操作"""
        value = self.redis_client.incrby(key, amount)
        # 设置过期时间
        self.redis_client.expire(key, expire_time)
        return value
    
    def get_with_lock(self, key, compute_func, lock_timeout=10, cache_timeout=300):
        """带锁的缓存获取"""
        lock_key = f"lock:{key}"
        
        # 尝试获取锁
        if self.redis_client.set(lock_key, "1", nx=True, ex=lock_timeout):
            try:
                # 计算并设置缓存
                result = compute_func()
                self.redis_client.setex(key, cache_timeout, result)
                return result
            finally:
                # 释放锁
                self.redis_client.delete(lock_key)
        else:
            # 等待锁释放后返回缓存值
            import time
            start_time = time.time()
            while time.time() - start_time < lock_timeout:
                result = self.redis_client.get(key)
                if result is not None:
                    return result
                time.sleep(0.1)
            # 超时后执行计算
            result = compute_func()
            self.redis_client.setex(key, cache_timeout, result)
            return result

# Redis缓存配置类
class RedisCacheConfig:
    """Redis缓存配置类"""
    
    @staticmethod
    def get_production_config():
        """生产环境Redis配置"""
        return {
            'BACKEND': 'django.core.cache.backends.redis.RedisCache',
            'LOCATION': [
                'redis://master.redis.example.com:6379/1',
                'redis://slave.redis.example.com:6379/1',
            ],
            'OPTIONS': {
                'CLIENT_CLASS': 'django_redis.client.ShardClient',
                'CONNECTION_POOL_KWARGS': {
                    'max_connections': 100,
                    'retry_on_timeout': True,
                    'health_check_interval': 30,
                    'socket_keepalive': True,
                },
                'COMPRESSOR': 'django_redis.compressors.zlib.ZlibCompressor',
                'SERIALIZER': 'django_redis.serializers.pickle.PickleSerializer',
            },
            'KEY_PREFIX': 'prod_app',
            'TIMEOUT': 900,  # 15分钟
            'VERSION': 1,
        }
    
    @staticmethod
    def get_session_config():
        """会话Redis配置"""
        return {
            'BACKEND': 'django.core.cache.backends.redis.RedisCache',
            'LOCATION': 'redis://session.redis.example.com:6379/2',
            'OPTIONS': {
                'CLIENT_CLASS': 'django_redis.client.DefaultClient',
                'CONNECTION_POOL_KWARGS': {
                    'max_connections': 50,
                },
            },
            'KEY_PREFIX': 'session',
            'TIMEOUT': 1209600,  # 2周
            'VERSION': 1,
        }

# 缓存健康检查
class CacheHealthChecker:
    """缓存健康检查器"""
    
    def __init__(self):
        from django.core.cache import caches
        self.cache = caches['default']
    
    def check_health(self):
        """检查缓存健康状态"""
        try:
            # 测试缓存连接
            test_key = 'health_check:test'
            test_value = 'healthy'
            
            # 设置和获取测试值
            self.cache.set(test_key, test_value, timeout=10)
            retrieved_value = self.cache.get(test_key)
            
            # 验证结果
            if retrieved_value == test_value:
                return {
                    'status': 'healthy',
                    'message': 'Cache is accessible and responsive',
                    'response_time': self._measure_response_time()
                }
            else:
                return {
                    'status': 'unhealthy',
                    'message': 'Cache returned unexpected value',
                    'expected': test_value,
                    'received': retrieved_value
                }
        except Exception as e:
            return {
                'status': 'unhealthy',
                'message': f'Cache error: {str(e)}',
                'error_type': type(e).__name__
            }
    
    def _measure_response_time(self):
        """测量响应时间"""
        import time
        start_time = time.time()
        self.cache.get('health_check:test')
        return time.time() - start_time

Memcached配置

# Memcached配置
"""
pip install python-memcached

pip install pylibmc
"""

# Memcached缓存配置
MEMCACHED_CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.memcached.PyLibMCCache',
        'LOCATION': [
            '127.0.0.1:11211',
            '127.0.0.1:11212',
        ],
        'OPTIONS': {
            'binary': True,
            'behaviors': {
                'tcp_nodelay': True,
                'ketama': True,
                'remove_failed': 4,
                'retry_timeout': 2,
                'dead_timeout': 10,
            }
        },
        'KEY_PREFIX': 'myapp',
        'TIMEOUT': 300,
    }
}

# Memcached管理类
class MemcachedManager:
    """Memcached管理器"""
    
    def __init__(self):
        import memcache
        self.mc = memcache.Client(
            ['127.0.0.1:11211', '127.0.0.1:11212'],
            debug=0
        )
    
    def get_multi(self, keys):
        """批量获取"""
        return self.mc.get_multi(keys)
    
    def set_multi(self, mapping, time=2592000):  # 30天
        """批量设置"""
        return self.mc.set_multi(mapping, time=time)
    
    def add(self, key, val, time=2592000):
        """仅在键不存在时设置"""
        return self.mc.add(key, val, time=time)
    
    def incr(self, key, delta=1):
        """递增操作"""
        return self.mc.incr(key, delta)
    
    def decr(self, key, delta=1):
        """递减操作"""
        return self.mc.decr(key, delta)

# Memcached缓存装饰器
def memcached_cached(timeout=300, key_prefix=''):
    """Memcached缓存装饰器"""
    def decorator(func):
        import hashlib
        import pickle
        
        def wrapper(*args, **kwargs):
            # 生成缓存键
            key_data = f"{key_prefix}:{func.__name__}:{args}:{sorted(kwargs.items())}"
            cache_key = hashlib.md5(key_data.encode()).hexdigest()
            
            # 尝试从Memcached获取
            cached_result = cache.get(cache_key)
            if cached_result is None:
                # 执行函数
                result = func(*args, **kwargs)
                # 存储到缓存
                cache.set(cache_key, result, timeout=timeout)
                return result
            return cached_result
        return wrapper
    return decorator

缓存类型与使用

低层级缓存

# 低层级缓存 - 缓存单个值
from django.core.cache import cache

def low_level_cache_example():
    """低层级缓存示例"""
    
    # 基本缓存操作
    cache.set('simple_key', 'simple_value', 300)  # 5分钟过期
    value = cache.get('simple_key')
    
    # 带默认值的获取
    value = cache.get('nonexistent_key', 'default_value')
    
    # 仅当键不存在时设置
    cache.add('new_key', 'new_value', 600)  # 10分钟
    
    # 获取或计算
    expensive_result = cache.get_or_set(
        'expensive_calculation',
        lambda: perform_expensive_calculation(),
        1800  # 30分钟
    )
    
    # 批量操作
    cache.set_many({
        'key1': 'value1',
        'key2': 'value2',
        'key3': 'value3'
    }, 600)
    
    values = cache.get_many(['key1', 'key2', 'key3'])
    
    return values

def perform_expensive_calculation():
    """执行昂贵的计算"""
    import time
    time.sleep(2)  # 模拟耗时操作
    return sum(range(1000000))

# 缓存键命名规范
class CacheKeyGenerator:
    """缓存键生成器"""
    
    @staticmethod
    def generate_user_key(user_id, suffix=''):
        """生成用户相关缓存键"""
        prefix = 'user'
        return f"{prefix}:{user_id}:{suffix}" if suffix else f"{prefix}:{user_id}"
    
    @staticmethod
    def generate_model_key(model_name, obj_id, field=''):
        """生成模型对象缓存键"""
        prefix = 'model'
        if field:
            return f"{prefix}:{model_name}:{obj_id}:{field}"
        return f"{prefix}:{model_name}:{obj_id}"
    
    @staticmethod
    def generate_query_key(query_params, model_name):
        """生成查询缓存键"""
        import hashlib
        query_str = str(sorted(query_params.items())) if query_params else ''
        hash_suffix = hashlib.md5(f"{model_name}:{query_str}".encode()).hexdigest()[:8]
        return f"query:{model_name}:{hash_suffix}"
    
    @staticmethod
    def generate_fragment_key(template_name, fragment_name, context_hash=''):
        """生成模板片段缓存键"""
        if context_hash:
            return f"fragment:{template_name}:{fragment_name}:{context_hash}"
        return f"fragment:{template_name}:{fragment_name}"

# 使用示例
def user_profile_cache_example(user_id):
    """用户资料缓存示例"""
    from django.contrib.auth.models import User
    
    # 生成缓存键
    profile_key = CacheKeyGenerator.generate_user_key(user_id, 'profile')
    permissions_key = CacheKeyGenerator.generate_user_key(user_id, 'permissions')
    
    # 获取缓存的用户资料
    profile_data = cache.get(profile_key)
    if profile_data is None:
        user = User.objects.get(id=user_id)
        profile_data = {
            'username': user.username,
            'email': user.email,
            'first_name': user.first_name,
            'last_name': user.last_name,
        }
        cache.set(profile_key, profile_data, 3600)  # 1小时
    
    # 获取缓存的权限信息
    permissions = cache.get(permissions_key)
    if permissions is None:
        user = User.objects.get(id=user_id)
        permissions = list(user.user_permissions.values_list('codename', flat=True))
        cache.set(permissions_key, permissions, 1800)  # 30分钟
    
    return {
        'profile': profile_data,
        'permissions': permissions
    }

模板片段缓存

# 模板片段缓存
from django.core.cache import cache
from django.template import Library
from django.utils.cache import patch_cache_control
from django.views.decorators.cache import cache_page

register = Library()

@register.simple_tag(takes_context=True)
def cached_partial(context, template_name, timeout=300, **kwargs):
    """缓存的模板片段标签"""
    from django.template.loader import render_to_string
    import hashlib
    
    # 生成缓存键
    cache_key = f"partial:{template_name}:{hash(str(kwargs))}"
    
    # 尝试从缓存获取
    rendered = cache.get(cache_key)
    if rendered is None:
        # 渲染模板
        rendered = render_to_string(template_name, {**context.flatten(), **kwargs})
        # 存储到缓存
        cache.set(cache_key, rendered, timeout)
    
    return rendered

# 自定义缓存模板标签
from django import template
from django.template.base import Node, Variable, TemplateSyntaxError
import re

register = template.Library()

class CacheNode(Node):
    """缓存节点"""
    
    def __init__(self, nodelist, expire_time_var, fragment_name, vary_on):
        self.nodelist = nodelist
        self.expire_time_var = Variable(expire_time_var)
        self.fragment_name = fragment_name
        self.vary_on = vary_on

    def render(self, context):
        """渲染缓存节点"""
        try:
            expire_time = self.expire_time_var.resolve(context)
        except template.VariableDoesNotExist:
            expire_time = 0
        
        # 生成缓存键
        vary_values = [Variable(var).resolve(context) for var in self.vary_on]
        cache_key = f"template.cache.{self.fragment_name}." + ".".join(str(v) for v in vary_values)
        
        value = cache.get(cache_key)
        if value is None:
            value = self.nodelist.render(context)
            cache.set(cache_key, value, expire_time)
        
        return value

@register.tag('cache')
def do_cache(parser, token):
    """缓存模板标签"""
    nodelist = parser.parse(('endcache',))
    parser.delete_first_token()
    
    tokens = token.contents.split()
    if len(tokens) < 3:
        raise TemplateSyntaxError("'%s' tag requires at least 2 arguments" % tokens[0])
    
    return CacheNode(nodelist, tokens[1], tokens[2], tokens[3:])

# 模板中使用示例
"""
<!-- 在模板中使用 -->
{% load cache %}
{% cache 500 sidebar user.username %}
    <!-- 侧边栏内容,缓存500秒 -->
    <div class="sidebar">
        <h3>欢迎, {{ user.username }}!</h3>
        <p>这是侧边栏内容...</p>
    </div>
{% endcache %}

{% cache 600 navigation request.user.is_staff %}
    <!-- 导航菜单,根据用户角色变化 -->
    <nav>
        {% if request.user.is_staff %}
            <a href="/admin/">管理面板</a>
        {% endif %}
        <a href="/dashboard/">仪表板</a>
    </nav>
{% endcache %}
"""

视图级别缓存

# 视图级别缓存
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
from django.views.generic import ListView, DetailView
from django.views import View
from django.http import JsonResponse
from django.core.cache import cache
import json

# 函数视图缓存
@cache_page(60 * 15)  # 缓存15分钟
def product_list_view(request):
    """商品列表视图(带缓存)"""
    from .models import Product
    
    products = Product.objects.filter(is_active=True).select_related('category')
    
    return JsonResponse({
        'products': [
            {
                'id': p.id,
                'name': p.name,
                'price': float(p.price),
                'category': p.category.name if p.category else None
            }
            for p in products
        ]
    })

@cache_page(60 * 30, cache='temporary')  # 使用临时缓存后端
def dashboard_stats_view(request):
    """仪表板统计视图"""
    from .models import Order, User
    
    stats = {
        'total_users': User.objects.count(),
        'total_orders': Order.objects.count(),
        'monthly_revenue': get_monthly_revenue(),
        'active_sessions': get_active_sessions_count()
    }
    
    return JsonResponse(stats)

def get_monthly_revenue():
    """获取月收入"""
    # 模拟计算
    import random
    return random.randint(10000, 50000)

def get_active_sessions_count():
    """获取活跃会话数"""
    # 模拟获取
    import random
    return random.randint(50, 200)

# 类视图缓存
@method_decorator(cache_page(60 * 10), name='dispatch')
class ProductListView(ListView):
    """商品列表类视图"""
    model = Product
    template_name = 'products/list.html'
    context_object_name = 'products'
    
    def get_queryset(self):
        return Product.objects.filter(is_active=True).select_related('category')

@method_decorator(cache_page(60 * 60), name='get')  # 1小时缓存
class ProductDetailView(DetailView):
    """商品详情类视图"""
    model = Product
    template_name = 'products/detail.html'
    context_object_name = 'product'

# 自定义缓存视图
class CachedAPIView(View):
    """带缓存的API视图"""
    
    def get_cache_key(self, request):
        """生成缓存键"""
        return f"api:{request.path}:{hash(request.GET.urlencode())}"
    
    def get_cache_timeout(self):
        """获取缓存超时时间"""
        return 300  # 5分钟
    
    def get(self, request):
        """GET请求处理"""
        cache_key = self.get_cache_key(request)
        
        # 尝试从缓存获取
        cached_response = cache.get(cache_key)
        if cached_response:
            return JsonResponse(cached_response)
        
        # 生成响应
        response_data = self.generate_response_data(request)
        
        # 缓存响应
        cache.set(cache_key, response_data, self.get_cache_timeout())
        
        return JsonResponse(response_data)
    
    def generate_response_data(self, request):
        """生成响应数据"""
        # 子类需要实现此方法
        raise NotImplementedError

# 特定条件缓存
def conditional_cache_page(timeout):
    """条件缓存装饰器"""
    def decorator(view_func):
        def wrapper(request, *args, **kwargs):
            # 检查是否应该缓存
            if should_cache_request(request):
                # 使用标准缓存
                from django.views.decorators.cache import cache_page
                cached_view = cache_page(timeout)(view_func)
                return cached_view(request, *args, **kwargs)
            else:
                # 不缓存,直接调用原函数
                return view_func(request, *args, **kwargs)
        return wrapper
    return decorator

def should_cache_request(request):
    """判断是否应该缓存请求"""
    # 不缓存POST请求
    if request.method != 'GET':
        return False
    
    # 不缓存带认证信息的请求
    if request.user.is_authenticated:
        return False
    
    # 不缓存带特定参数的请求
    if 'nocache' in request.GET:
        return False
    
    # 不缓存管理员页面
    if request.path.startswith('/admin/'):
        return False
    
    return True

# 使用条件缓存
@conditional_cache_page(60 * 15)
def conditional_cached_view(request):
    """条件缓存视图"""
    return JsonResponse({'message': 'This might be cached'})

模型层面缓存

# 模型层面缓存
from django.db import models
from django.core.cache import cache
from django.conf import settings
import hashlib
from typing import Optional, List, Dict, Any

class CacheMixin:
    """缓存混入类"""
    
    def get_cache_key(self, field: str = '', suffix: str = '') -> str:
        """生成模型实例缓存键"""
        base_key = f"{self._meta.label}:{self.pk}"
        if field:
            base_key = f"{base_key}:{field}"
        if suffix:
            base_key = f"{base_key}:{suffix}"
        return base_key
    
    def get_cache_timeout(self) -> int:
        """获取缓存超时时间"""
        return getattr(settings, 'MODEL_CACHE_TIMEOUT', 300)
    
    def cache_get(self, field: str, default=None):
        """从缓存获取字段值"""
        cache_key = self.get_cache_key(field)
        return cache.get(cache_key, default)
    
    def cache_set(self, field: str, value, timeout: Optional[int] = None):
        """设置字段缓存"""
        cache_key = self.get_cache_key(field)
        cache.set(cache_key, value, timeout or self.get_cache_timeout())
    
    def cache_delete(self, field: str = ''):
        """删除字段缓存"""
        if field:
            cache_key = self.get_cache_key(field)
            cache.delete(cache_key)
        else:
            # 删除所有相关缓存
            base_pattern = self.get_cache_key('')
            # 这里需要根据具体缓存后端实现模式匹配删除
            # 对于Redis,可以使用KEYS命令(生产环境需谨慎使用)
    
    def cache_property(self, func):
        """缓存属性装饰器"""
        @property
        def cached_prop(self):
            cache_key = self.get_cache_key(func.__name__)
            value = cache.get(cache_key)
            if value is None:
                value = func(self)
                cache.set(cache_key, value, self.get_cache_timeout())
            return value
        return cached_prop

# 带缓存的用户模型
class CachedUser(models.Model, CacheMixin):
    """带缓存的用户模型"""
    username = models.CharField(max_length=150, unique=True)
    email = models.EmailField()
    first_name = models.CharField(max_length=30)
    last_name = models.CharField(max_length=150)
    is_active = models.BooleanField(default=True)
    date_joined = models.DateTimeField(auto_now_add=True)
    
    def get_full_name(self):
        """获取全名"""
        return f"{self.first_name} {self.last_name}".strip()
    
    @CacheMixin.cache_property
    def profile_summary(self):
        """缓存的个人资料摘要"""
        # 模拟复杂计算
        import time
        time.sleep(0.1)  # 模拟耗时操作
        return {
            'username': self.username,
            'full_name': self.get_full_name(),
            'email': self.email,
            'account_age_days': (self.date_joined.now() - self.date_joined).days
        }
    
    def get_permissions(self):
        """获取用户权限(带缓存)"""
        cache_key = self.get_cache_key('permissions')
        permissions = cache.get(cache_key)
        
        if permissions is None:
            # 从数据库获取权限
            permissions = list(self.user_permissions.values_list('codename', flat=True))
            cache.set(cache_key, permissions, 300)  # 5分钟
        
        return permissions
    
    def save(self, *args, **kwargs):
        """保存时清除相关缓存"""
        # 保存前清除缓存
        self.cache_delete('profile_summary')
        self.cache_delete('permissions')
        
        super().save(*args, **kwargs)
    
    def delete(self, *args, **kwargs):
        """删除时清除所有缓存"""
        # 清除所有相关缓存
        self.cache_delete()
        super().delete(*args, **kwargs)

# 查询集缓存
class CachedQuerySet(models.QuerySet):
    """带缓存的查询集"""
    
    def cache_results(self, timeout=300, key_prefix='query'):
        """缓存查询结果"""
        import hashlib
        
        # 生成查询的哈希值作为缓存键
        query_hash = hashlib.md5(str(self.query).encode()).hexdigest()
        cache_key = f"{key_prefix}:{query_hash}"
        
        results = cache.get(cache_key)
        if results is None:
            results = list(self)
            cache.set(cache_key, results, timeout)
        
        return results
    
    def cache_count(self, timeout=300):
        """缓存计数结果"""
        import hashlib
        
        query_hash = hashlib.md5(str(self.query).encode()).hexdigest()
        cache_key = f"count:{query_hash}"
        
        count = cache.get(cache_key)
        if count is None:
            count = self.count()
            cache.set(cache_key, count, timeout)
        
        return count

class CachedManager(models.Manager):
    """带缓存的管理器"""
    
    def get_queryset(self):
        return CachedQuerySet(self.model, using=self._db)
    
    def cached_filter(self, *args, **kwargs):
        """缓存过滤结果"""
        import hashlib
        
        # 生成缓存键
        filter_str = str(sorted((args, tuple(sorted(kwargs.items())))))
        cache_key = f"filter:{self.model._meta.label}:{hashlib.md5(filter_str.encode()).hexdigest()[:8]}"
        
        results = cache.get(cache_key)
        if results is None:
            results = list(self.filter(*args, **kwargs))
            cache.set(cache_key, results, 300)  # 5分钟
        
        return results

# 使用示例模型
class Product(models.Model):
    """商品模型"""
    name = models.CharField(max_length=200)
    price = models.DecimalField(max_digits=10, decimal_places=2)
    category = models.ForeignKey('Category', on_delete=models.CASCADE)
    description = models.TextField()
    is_active = models.BooleanField(default=True)
    created_at = models.DateTimeField(auto_now_add=True)
    
    objects = CachedManager()
    
    @CacheMixin.cache_property
    def related_products(self):
        """相关商品"""
        return list(Product.objects.filter(
            category=self.category,
            is_active=True
        ).exclude(id=self.id)[:10])
    
    def get_price_with_tax(self):
        """含税价格(带缓存)"""
        cache_key = self.get_cache_key('price_with_tax')
        price_with_tax = cache.get(cache_key)
        
        if price_with_tax is None:
            tax_rate = 0.1  # 10%税率
            price_with_tax = float(self.price) * (1 + tax_rate)
            cache.set(cache_key, price_with_tax, 3600)  # 1小时
        
        return price_with_tax

class Category(models.Model):
    """分类模型"""
    name = models.CharField(max_length=100)
    slug = models.SlugField(unique=True)
    
    objects = CachedManager()
    
    @CacheMixin.cache_property
    def active_products_count(self):
        """活跃商品数量"""
        return self.product_set.filter(is_active=True).count()

缓存策略模式

Cache-Aside模式

# Cache-Aside模式实现
class CacheAsidePattern:
    """Cache-Aside模式实现"""
    
    @staticmethod
    def get_data_with_cache(model_class, pk, timeout=300):
        """使用Cache-Aside模式获取数据"""
        cache_key = f"model:{model_class._meta.label}:{pk}"
        
        # 1. 先从缓存获取
        data = cache.get(cache_key)
        if data is not None:
            # 缓存命中,直接返回
            return data
        
        # 2. 缓存未命中,从数据库获取
        try:
            instance = model_class.objects.get(pk=pk)
            # 3. 将数据存入缓存
            cache.set(cache_key, instance, timeout)
            return instance
        except model_class.DoesNotExist:
            # 数据不存在,也缓存None值,避免频繁查询
            cache.set(cache_key, None, 60)  # 缓存1分钟
            return None
    
    @staticmethod
    def update_data_with_cache(model_class, pk, **updates):
        """使用Cache-Aside模式更新数据"""
        cache_key = f"model:{model_class._meta.label}:{pk}"
        
        # 1. 更新数据库
        updated_count = model_class.objects.filter(pk=pk).update(**updates)
        
        # 2. 删除缓存(让下次查询重新加载)
        cache.delete(cache_key)
        
        return updated_count
    
    @staticmethod
    def delete_data_with_cache(model_class, pk):
        """使用Cache-Aside模式删除数据"""
        cache_key = f"model:{model_class._meta.label}:{pk}"
        
        # 1. 删除数据库记录
        deleted_count = model_class.objects.filter(pk=pk).delete()[0]
        
        # 2. 删除缓存
        cache.delete(cache_key)
        
        return deleted_count

# 使用示例
def user_operations_with_cache(user_id):
    """使用Cache-Aside模式的用户操作"""
    
    # 获取用户
    user = CacheAsidePattern.get_data_with_cache(
        'auth.User', user_id, timeout=1800  # 30分钟
    )
    
    if user:
        # 更新用户
        CacheAsidePattern.update_data_with_cache(
            'auth.User', user_id, last_login=timezone.now()
        )
    
    return user

# 高级Cache-Aside实现
class AdvancedCacheAside:
    """高级Cache-Aside实现"""
    
    def __init__(self, cache_backend='default', default_timeout=300):
        from django.core.cache import caches
        self.cache = caches[cache_backend]
        self.default_timeout = default_timeout
    
    def get_or_fetch(self, cache_key: str, fetch_func, timeout: int = None, 
                     cache_empty: bool = True):
        """获取或获取并缓存"""
        # 从缓存获取
        result = self.cache.get(cache_key)
        
        if result is not None:
            if result == "__empty__":
                return None if cache_empty else result
            return result
        
        # 执行获取函数
        result = fetch_func()
        
        # 缓存结果
        if result is None and not cache_empty:
            # 不缓存空值
            pass
        else:
            cache_value = result if result is not None else "__empty__"
            self.cache.set(
                cache_key, 
                cache_value, 
                timeout or self.default_timeout
            )
        
        return result
    
    def invalidate_pattern(self, pattern: str):
        """根据模式删除缓存(需要Redis支持)"""
        try:
            import redis
            # 获取Redis客户端
            r = redis.Redis(host='localhost', port=6379, db=0)
            keys = r.keys(pattern)
            if keys:
                r.delete(*keys)
        except ImportError:
            # 如果没有Redis,记录警告
            import logging
            logger = logging.getLogger(__name__)
            logger.warning(f"Cannot invalidate cache pattern {pattern}, Redis not available")

# 使用示例
def advanced_cache_usage():
    """高级缓存使用示例"""
    cache_handler = AdvancedCacheAside(default_timeout=600)
    
    def fetch_expensive_data():
        """获取昂贵数据的函数"""
        import time
        time.sleep(1)  # 模拟耗时操作
        return {"data": "expensive result", "timestamp": time.time()}
    
    # 使用高级缓存
    result = cache_handler.get_or_fetch(
        "expensive_calculation:v1",
        fetch_expensive_data,
        timeout=900  # 15分钟
    )
    
    return result

Read-Through模式

# Read-Through模式实现
from django.core.cache import cache
from django.db import models

class ReadThroughCache:
    """Read-Through缓存实现"""
    
    def __init__(self, model_class, cache_timeout=300):
        self.model_class = model_class
        self.cache_timeout = cache_timeout
        self.cache_prefix = f"read_through:{model_class._meta.label}"
    
    def get(self, key):
        """获取数据,自动从底层加载"""
        cache_key = f"{self.cache_prefix}:{key}"
        
        # 尝试从缓存获取
        result = cache.get(cache_key)
        if result is None:
            # 缓存未命中,从数据库加载
            result = self._load_from_db(key)
            if result is not None:
                # 存入缓存
                cache.set(cache_key, result, self.cache_timeout)
        
        return result
    
    def _load_from_db(self, key):
        """从数据库加载数据"""
        try:
            # 假设key是主键
            return self.model_class.objects.get(pk=key)
        except self.model_class.DoesNotExist:
            return None
    
    def get_many(self, keys):
        """批量获取"""
        cache_keys = [f"{self.cache_prefix}:{key}" for key in keys]
        
        # 批量从缓存获取
        cached_results = cache.get_many(cache_keys)
        
        # 找出未命中的键
        missing_keys = []
        for i, key in enumerate(keys):
            cache_key = cache_keys[i]
            if cache_key not in cached_results:
                missing_keys.append(key)
        
        # 从数据库加载未命中的数据
        if missing_keys:
            db_results = self._load_many_from_db(missing_keys)
            
            # 存入缓存
            cache_to_set = {}
            for key, value in zip(missing_keys, db_results):
                cache_key = f"{self.cache_prefix}:{key}"
                cache_to_set[cache_key] = value
            
            if cache_to_set:
                cache.set_many(cache_to_set, self.cache_timeout)
            
            # 合并结果
            cached_results.update({
                f"{self.cache_prefix}:{key}": value 
                for key, value in zip(missing_keys, db_results)
            })
        
        # 返回按原始顺序排列的结果
        results = []
        for key in keys:
            cache_key = f"{self.cache_prefix}:{key}"
            results.append(cached_results.get(cache_key))
        
        return results
    
    def _load_many_from_db(self, keys):
        """批量从数据库加载"""
        return list(self.model_class.objects.filter(pk__in=keys))

# 使用示例
def read_through_usage():
    """Read-Through使用示例"""
    from django.contrib.auth.models import User
    
    user_cache = ReadThroughCache(User, cache_timeout=1800)  # 30分钟
    
    # 单个获取
    user = user_cache.get(1)
    
    # 批量获取
    users = user_cache.get_many([1, 2, 3, 4, 5])
    
    return users

# 带验证的Read-Through
class ValidatedReadThroughCache(ReadThroughCache):
    """带验证的Read-Through缓存"""
    
    def __init__(self, model_class, cache_timeout=300, validator=None):
        super().__init__(model_class, cache_timeout)
        self.validator = validator or self._default_validator
    
    def _default_validator(self, obj):
        """默认验证器"""
        return obj is not None and hasattr(obj, 'pk')
    
    def get(self, key):
        """获取数据并验证"""
        result = super().get(key)
        
        if not self.validator(result):
            # 验证失败,删除缓存并重新加载
            cache_key = f"{self.cache_prefix}:{key}"
            cache.delete(cache_key)
            result = self._load_from_db(key)
            
            if self.validator(result):
                cache.set(cache_key, result, self.cache_timeout)
        
        return result

# 使用示例
def validated_read_through_usage():
    """带验证的Read-Through使用示例"""
    from django.contrib.auth.models import User
    
    def user_validator(user):
        """用户验证器"""
        return user is not None and user.is_active
    
    user_cache = ValidatedReadThroughCache(
        User, 
        cache_timeout=900,  # 15分钟
        validator=user_validator
    )
    
    user = user_cache.get(1)
    return user

Write-Through模式

# Write-Through模式实现
class WriteThroughCache:
    """Write-Through缓存实现"""
    
    def __init__(self, model_class, cache_timeout=300):
        self.model_class = model_class
        self.cache_timeout = cache_timeout
        self.cache_prefix = f"write_through:{model_class._meta.label}"
    
    def create(self, **data):
        """创建并写入缓存"""
        # 1. 创建数据库记录
        instance = self.model_class.objects.create(**data)
        
        # 2. 同时写入缓存
        cache_key = f"{self.cache_prefix}:{instance.pk}"
        cache.set(cache_key, instance, self.cache_timeout)
        
        return instance
    
    def update(self, key, **data):
        """更新并同步缓存"""
        # 1. 更新数据库
        updated_count = self.model_class.objects.filter(pk=key).update(**data)
        
        if updated_count > 0:
            # 2. 同步更新缓存
            cache_key = f"{self.cache_prefix}:{key}"
            
            # 从数据库获取最新数据并更新缓存
            try:
                updated_instance = self.model_class.objects.get(pk=key)
                cache.set(cache_key, updated_instance, self.cache_timeout)
            except self.model_class.DoesNotExist:
                # 如果数据库中不存在,删除缓存
                cache.delete(cache_key)
        
        return updated_count
    
    def bulk_update(self, updates_list):
        """批量更新"""
        updated_count = 0
        cache_updates = {}
        
        for update_data in updates_list:
            pk = update_data.pop('pk')
            count = self.model_class.objects.filter(pk=pk).update(**update_data)
            updated_count += count
            
            if count > 0:
                # 准备缓存更新
                try:
                    updated_instance = self.model_class.objects.get(pk=pk)
                    cache_key = f"{self.cache_prefix}:{pk}"
                    cache_updates[cache_key] = updated_instance
                except self.model_class.DoesNotExist:
                    cache.delete(f"{self.cache_prefix}:{pk}")
        
        # 批量更新缓存
        if cache_updates:
            cache.set_many(cache_updates, self.cache_timeout)
        
        return updated_count
    
    def delete(self, key):
        """删除并同步缓存"""
        # 1. 删除数据库记录
        deleted_count = self.model_class.objects.filter(pk=key).delete()[0]
        
        # 2. 同步删除缓存
        if deleted_count > 0:
            cache_key = f"{self.cache_prefix}:{key}"
            cache.delete(cache_key)
        
        return deleted_count
    
    def get(self, key):
        """获取(只读操作)"""
        cache_key = f"{self.cache_prefix}:{key}"
        result = cache.get(cache_key)
        
        if result is None:
            try:
                result = self.model_class.objects.get(pk=key)
                cache.set(cache_key, result, self.cache_timeout)
            except self.model_class.DoesNotExist:
                result = None
        
        return result

# 使用示例
def write_through_usage():
    """Write-Through使用示例"""
    from django.contrib.auth.models import User
    
    user_cache = WriteThroughCache(User, cache_timeout=1800)
    
    # 创建用户
    user = user_cache.create(
        username='newuser',
        email='newuser@example.com',
        first_name='New',
        last_name='User'
    )
    
    # 更新用户
    user_cache.update(user.pk, last_name='Updated')
    
    # 获取用户
    retrieved_user = user_cache.get(user.pk)
    
    return retrieved_user

# 事务安全的Write-Through
from django.db import transaction

class TransactionSafeWriteThroughCache(WriteThroughCache):
    """事务安全的Write-Through缓存"""
    
    def update(self, key, **data):
        """事务安全的更新"""
        with transaction.atomic():
            # 1. 更新数据库
            updated_count = self.model_class.objects.filter(pk=key).update(**data)
            
            if updated_count > 0:
                # 2. 同步更新缓存
                try:
                    updated_instance = self.model_class.objects.select_for_update().get(pk=key)
                    cache_key = f"{self.cache_prefix}:{key}"
                    cache.set(cache_key, updated_instance, self.cache_timeout)
                except self.model_class.DoesNotExist:
                    cache.delete(f"{self.cache_prefix}:{key}")
        
        return updated_count
    
    def bulk_update(self, updates_list):
        """事务安全的批量更新"""
        with transaction.atomic():
            updated_count = 0
            cache_updates = {}
            
            for update_data in updates_list:
                pk = update_data.pop('pk')
                
                # 使用select_for_update锁定记录
                try:
                    instance = self.model_class.objects.select_for_update().get(pk=pk)
                    for field, value in update_data.items():
                        setattr(instance, field, value)
                    instance.save()
                    updated_count += 1
                    
                    # 准备缓存更新
                    cache_key = f"{self.cache_prefix}:{pk}"
                    cache_updates[cache_key] = instance
                except self.model_class.DoesNotExist:
                    continue
            
            # 批量更新缓存
            if cache_updates:
                cache.set_many(cache_updates, self.cache_timeout)
        
        return updated_count

缓存失效策略

# 缓存失效策略
import time
from typing import List, Callable

class CacheInvalidationStrategy:
    """缓存失效策略"""
    
    @staticmethod
    def time_based_invalidaton(timeout: int):
        """基于时间的失效策略"""
        def strategy(func):
            def wrapper(*args, **kwargs):
                cache_key = f"{func.__module__}:{func.__name__}:{hash(str(args) + str(kwargs))}"
                result = cache.get(cache_key)
                
                if result is None:
                    result = func(*args, **kwargs)
                    cache.set(cache_key, result, timeout)
                
                return result
            return wrapper
        return strategy
    
    @staticmethod
    def event_based_invalidaton(invalidate_on: List[str]):
        """基于事件的失效策略"""
        def decorator(func):
            def wrapper(*args, **kwargs):
                cache_key = f"event_cache:{func.__name__}:{hash(str(args) + str(kwargs))}"
                
                result = cache.get(cache_key)
                if result is None:
                    result = func(*args, **kwargs)
                    
                    # 设置缓存并关联失效事件
                    cache.set(cache_key, result, timeout=3600)
                    
                    # 注册失效监听器
                    for event in invalidate_on:
                        register_invalidation_listener(event, cache_key)
                
                return result
            return wrapper
        return decorator

# 缓存预热策略
class CacheWarmingStrategy:
    """缓存预热策略"""
    
    @staticmethod
    def warm_common_queries():
        """预热常见查询"""
        from django.contrib.auth.models import User
        from .models import Product, Category
        
        # 预热热门商品
        popular_products = Product.objects.filter(
            is_active=True
        ).order_by('-view_count')[:50]
        
        for product in popular_products:
            # 预热单个商品缓存
            cache_key = f"product:{product.pk}"
            cache.set(cache_key, product, 3600)
        
        # 预热分类统计
        categories = Category.objects.all()
        for category in categories:
            count = category.product_set.filter(is_active=True).count()
            cache_key = f"category:{category.pk}:product_count"
            cache.set(cache_key, count, 7200)  # 2小时
    
    @staticmethod
    def scheduled_warming():
        """定时预热"""
        import threading
        
        def warm_cache():
            while True:
                try:
                    CacheWarmingStrategy.warm_common_queries()
                    time.sleep(3600)  # 每小时预热一次
                except Exception as e:
                    import logging
                    logger = logging.getLogger(__name__)
                    logger.error(f"Cache warming failed: {e}")
                    time.sleep(300)  # 失败后5分钟重试
        
        # 在后台线程中运行
        thread = threading.Thread(target=warm_cache, daemon=True)
        thread.start()

# 缓存穿透防护
class CachePenetrationProtection:
    """缓存穿透防护"""
    
    @staticmethod
    def protect_null_values(func):
        """保护空值防止缓存穿透"""
        def wrapper(*args, **kwargs):
            cache_key = f"protected:{func.__name__}:{hash(str(args) + str(kwargs))}"
            
            # 检查是否有保护标记
            protection_key = f"protection:{cache_key}"
            if cache.get(protection_key):
                # 正在查询中,返回默认值
                return func.__defaults__[0] if func.__defaults__ else None
            
            result = cache.get(cache_key)
            if result is not None:
                if result == "__null__":
                    return None
                return result
            
            # 设置保护标记,防止并发查询
            cache.set(protection_key, 1, 30)  # 保护30秒
            
            try:
                result = func(*args, **kwargs)
                
                # 缓存结果(包括空值)
                cache_result = result if result is not None else "__null__"
                cache.set(cache_key, cache_result, 300)  # 5分钟
                
                # 移除保护标记
                cache.delete(protection_key)
                
                return result
            except Exception:
                # 查询失败,移除保护标记
                cache.delete(protection_key)
                raise
        
        return wrapper

# 使用示例
@CachePenetrationProtection.protect_null_values
def get_user_by_username(username):
    """获取用户(防穿透)"""
    from django.contrib.auth.models import User
    try:
        return User.objects.get(username=username)
    except User.DoesNotExist:
        return None

# 缓存雪崩防护
class CacheAvalancheProtection:
    """缓存雪崩防护"""
    
    @staticmethod
    def add_random_timeout(base_timeout: int, jitter_range: int = 60):
        """添加随机超时防止雪崩"""
        import random
        
        def decorator(func):
            def wrapper(*args, **kwargs):
                cache_key = f"avalanche_protected:{func.__name__}:{hash(str(args) + str(kwargs))}"
                
                result = cache.get(cache_key)
                if result is None:
                    result = func(*args, **kwargs)
                    
                    # 添加随机抖动
                    actual_timeout = base_timeout + random.randint(-jitter_range, jitter_range)
                    cache.set(cache_key, result, actual_timeout)
                
                return result
            return wrapper
        return decorator

# 使用示例
@CacheAvalancheProtection.add_random_timeout(3600, 300)  # 1小时±5分钟
def get_popular_articles():
    """获取热门文章(防雪崩)"""
    from .models import Article
    return list(Article.objects.filter(
        is_published=True
    ).order_by('-view_count')[:10])

缓存性能优化

缓存压缩

# 缓存压缩优化
import pickle
import zlib
import gzip
import json
from django.core.cache import cache

class CompressedCache:
    """压缩缓存类"""
    
    @staticmethod
    def set_compressed(key: str, value, timeout: int = 300, compression_level: int = 6):
        """设置压缩缓存"""
        # 序列化数据
        serialized_data = pickle.dumps(value)
        
        # 压缩数据
        compressed_data = zlib.compress(serialized_data, compression_level)
        
        # 存储压缩后的数据
        cache.set(key, compressed_data, timeout)
    
    @staticmethod
    def get_compressed(key: str, default=None):
        """获取压缩缓存"""
        compressed_data = cache.get(key, default)
        
        if compressed_data is default:
            return default
        
        if compressed_data is None:
            return None
        
        # 解压缩数据
        serialized_data = zlib.decompress(compressed_data)
        
        # 反序列化
        return pickle.loads(serialized_data)
    
    @staticmethod
    def set_json_compressed(key: str, value, timeout: int = 300):
        """使用JSON压缩设置缓存"""
        # 转换为JSON并编码
        json_data = json.dumps(value, default=str).encode('utf-8')
        
        # 压缩
        compressed_data = gzip.compress(json_data)
        
        # 存储
        cache.set(key, compressed_data, timeout)
    
    @staticmethod
    def get_json_compressed(key: str, default=None):
        """使用JSON压缩获取缓存"""
        compressed_data = cache.get(key, default)
        
        if compressed_data is default:
            return default
        
        if compressed_data is None:
            return None
        
        # 解压缩
        json_data = gzip.decompress(compressed_data)
        
        # 解析JSON
        return json.loads(json_data.decode('utf-8'))

# 压缩装饰器
def compressed_cache(timeout=300, compression_type='zlib'):
    """压缩缓存装饰器"""
    def decorator(func):
        def wrapper(*args, **kwargs):
            # 生成缓存键
            cache_key = f"compressed:{func.__name__}:{hash(str(args) + str(kwargs))}"
            
            # 尝试获取压缩缓存
            if compression_type == 'zlib':
                result = CompressedCache.get_compressed(cache_key)
            elif compression_type == 'json':
                result = CompressedCache.get_json_compressed(cache_key)
            else:
                result = cache.get(cache_key)
            
            if result is None:
                # 执行函数
                result = func(*args, **kwargs)
                
                # 存储压缩缓存
                if compression_type == 'zlib':
                    CompressedCache.set_compressed(cache_key, result, timeout)
                elif compression_type == 'json':
                    CompressedCache.set_json_compressed(cache_key, result, timeout)
                else:
                    cache.set(cache_key, result, timeout)
            
            return result
        return wrapper
    return decorator

# 使用示例
@compressed_cache(timeout=1800, compression_type='zlib')
def get_large_dataset():
    """获取大数据集"""
    # 模拟大数据集
    large_data = [{'id': i, 'data': f'data_{i}'} for i in range(10000)]
    return large_data

# 批量压缩缓存
class BatchCompressedCache:
    """批量压缩缓存"""
    
    @staticmethod
    def set_many_compressed(data_dict: dict, timeout: int = 300):
        """批量设置压缩缓存"""
        compressed_dict = {}
        for key, value in data_dict.items():
            serialized_data = pickle.dumps(value)
            compressed_data = zlib.compress(serialized_data)
            compressed_dict[key] = compressed_data
        
        cache.set_many(compressed_dict, timeout)
    
    @staticmethod
    def get_many_compressed(keys: list):
        """批量获取压缩缓存"""
        compressed_values = cache.get_many(keys)
        decompressed_values = {}
        
        for key, compressed_value in compressed_values.items():
            if compressed_value:
                serialized_data = zlib.decompress(compressed_value)
                decompressed_values[key] = pickle.loads(serialized_data)
            else:
                decompressed_values[key] = compressed_value
        
        return decompressed_values

缓存分片

# 缓存分片策略
import hashlib
from typing import Any, List, Dict

class ShardedCache:
    """分片缓存实现"""
    
    def __init__(self, num_shards: int = 4):
        self.num_shards = num_shards
        self.shard_keys = [f"shard_{i}" for i in range(num_shards)]
    
    def _get_shard_index(self, key: str) -> int:
        """获取键所属的分片索引"""
        hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
        return hash_value % self.num_shards
    
    def _get_shard_key(self, key: str) -> str:
        """获取分片键"""
        shard_index = self._get_shard_index(key)
        return f"{self.shard_keys[shard_index]}:{key}"
    
    def set(self, key: str, value, timeout: int = 300):
        """设置分片缓存"""
        shard_key = self._get_shard_key(key)
        cache.set(shard_key, value, timeout)
    
    def get(self, key: str, default=None):
        """获取分片缓存"""
        shard_key = self._get_shard_key(key)
        return cache.get(shard_key, default)
    
    def delete(self, key: str):
        """删除分片缓存"""
        shard_key = self._get_shard_key(key)
        return cache.delete(shard_key)
    
    def set_many(self, data_dict: Dict[str, Any], timeout: int = 300):
        """批量设置分片缓存"""
        shard_dicts = {i: {} for i in range(self.num_shards)}
        
        for key, value in data_dict.items():
            shard_index = self._get_shard_index(key)
            shard_key = self._get_shard_key(key)
            shard_dicts[shard_index][shard_key] = value
        
        # 并行设置各分片
        for shard_index in range(self.num_shards):
            if shard_dicts[shard_index]:
                cache.set_many(shard_dicts[shard_index], timeout)

# 一致性哈希分片
class ConsistentHashShardCache:
    """一致性哈希分片缓存"""
    
    def __init__(self, nodes: List[str], replicas: int = 150):
        self.nodes = nodes
        self.replicas = replicas
        self.ring = {}
        self.sorted_keys = []
        
        for node in self.nodes:
            for i in range(self.replicas):
                key = self._hash(f"{node}:{i}")
                self.ring[key] = node
                self.sorted_keys.append(key)
        
        self.sorted_keys.sort()
    
    def _hash(self, key: str) -> int:
        """计算哈希值"""
        return int(hashlib.sha256(key.encode()).hexdigest(), 16)
    
    def _get_node(self, key: str) -> str:
        """获取键对应的节点"""
        if not self.ring:
            return None
        
        hash_key = self._hash(key)
        
        # 二分查找
        import bisect
        idx = bisect.bisect(self.sorted_keys, hash_key)
        
        if idx == len(self.sorted_keys):
            idx = 0
        
        return self.ring[self.sorted_keys[idx]]
    
    def get_cache_backend(self, key: str):
        """获取对应的缓存后端"""
        from django.core.cache import caches
        node = self._get_node(key)
        return caches[node] if node else cache
    
    def set(self, key: str, value, timeout: int = 300):
        """设置缓存"""
        backend = self.get_cache_backend(key)
        backend.set(key, value, timeout)
    
    def get(self, key: str, default=None):
        """获取缓存"""
        backend = self.get_cache_backend(key)
        return backend.get(key, default)

# 使用示例
def sharded_cache_example():
    """分片缓存示例"""
    # 创建4个分片的缓存
    sharded_cache = ShardedCache(num_shards=4)
    
    # 设置数据
    for i in range(100):
        sharded_cache.set(f"user:{i}", {"id": i, "name": f"user_{i}"})
    
    # 获取数据
    user_data = sharded_cache.get("user:50")
    
    return user_data

缓存预取和懒加载

# 缓存预取和懒加载
import asyncio
import concurrent.futures
from typing import List, Dict, Any

class LazyLoadCache:
    """懒加载缓存"""
    
    def __init__(self):
        self._cache = {}
        self._loading = set()
    
    def get_lazy(self, key: str, loader_func, timeout: int = 300):
        """懒加载获取"""
        if key in self._cache:
            return self._cache[key]
        
        if key in self._loading:
            # 正在加载,返回占位符或等待
            import time
            start_time = time.time()
            while key in self._loading and time.time() - start_time < 5:  # 5秒超时
                time.sleep(0.1)
            return self._cache.get(key) if key in self._cache else None
        
        # 开始加载
        self._loading.add(key)
        try:
            value = loader_func()
            self._cache[key] = value
            # 同时设置到主缓存
            cache.set(key, value, timeout)
            return value
        finally:
            self._loading.discard(key)
    
    def prefetch(self, keys: List[str], loader_func, timeout: int = 300):
        """预取多个键"""
        def load_single(key):
            if key not in self._cache and key not in self._loading:
                self._loading.add(key)
                try:
                    value = loader_func(key)
                    self._cache[key] = value
                    cache.set(key, value, timeout)
                    return value
                finally:
                    self._loading.discard(key)
        
        # 使用线程池并行加载
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            futures = [executor.submit(load_single, key) for key in keys]
            results = [future.result() for future in futures]
        
        return results

# 智能预取策略
class IntelligentPrefetch:
    """智能预取策略"""
    
    def __init__(self):
        self.access_patterns = {}
        self.prefetch_queue = []
    
    def record_access(self, key: str):
        """记录访问模式"""
        if key not in self.access_patterns:
            self.access_patterns[key] = {'count': 0, 'last_access': None}
        
        self.access_patterns[key]['count'] += 1
        self.access_patterns[key]['last_access'] = time.time()
    
    def predict_next_accesses(self, current_key: str, n: int = 5) -> List[str]:
        """预测下一个访问的键"""
        # 简单的最近最少使用预测
        recent_keys = [
            k for k, v in sorted(
                self.access_patterns.items(),
                key=lambda x: x[1]['last_access'] or 0,
                reverse=True
            )[:n]
        return recent_keys

# 预取装饰器
def prefetch_on_access(predict_func=None, prefetch_count=3):
    """访问时预取装饰器"""
    def decorator(func):
        def wrapper(*args, **kwargs):
            result = func(*args, **kwargs)
            
            # 如果提供了预测函数,执行预取
            if predict_func:
                predicted_keys = predict_func(args[0] if args else '', prefetch_count)
                # 在后台线程中预取
                import threading
                thread = threading.Thread(
                    target=lambda: prefetch_data(predicted_keys)
                )
                thread.daemon = True
                thread.start()
            
            return result
        return wrapper
    return decorator

def prefetch_data(keys):
    """预取数据"""
    for key in keys:
        # 尝试从缓存获取,如果不存在则加载
        if cache.get(key) is None:
            # 这里可以实现具体的预取逻辑
            pass

# 使用示例
@prefetch_on_access(prefetch_count=5)
def get_user_data(user_id):
    """获取用户数据(带预取)"""
    from django.contrib.auth.models import User
    return User.objects.get(id=user_id)

缓存监控和诊断

# 缓存监控和诊断工具
import time
import psutil
import redis
from django.core.cache import cache
from django.conf import settings
from collections import defaultdict, deque
import threading
from datetime import datetime, timedelta

class CacheMonitor:
    """缓存监控器"""
    
    def __init__(self):
        self.hit_count = 0
        self.miss_count = 0
        self.access_times = deque(maxlen=1000)  # 最近1000次访问时间
        self.lock = threading.Lock()
        self.stats_history = deque(maxlen=100)  # 保留最近100个统计点
    
    def record_hit(self, duration):
        """记录缓存命中"""
        with self.lock:
            self.hit_count += 1
            self.access_times.append(('hit', time.time(), duration))
    
    def record_miss(self, duration):
        """记录缓存未命中"""
        with self.lock:
            self.miss_count += 1
            self.access_times.append(('miss', time.time(), duration))
    
    def get_stats(self):
        """获取统计信息"""
        with self.lock:
            total_requests = self.hit_count + self.miss_count
            hit_rate = self.hit_count / total_requests if total_requests > 0 else 0
            
            # 计算最近的统计
            recent_hits = sum(1 for t in self.access_times if t[0] == 'hit')
            recent_misses = len(self.access_times) - recent_hits
            recent_total = len(self.access_times)
            recent_hit_rate = recent_hits / recent_total if recent_total > 0 else 0
            
            # 计算平均响应时间
            if self.access_times:
                avg_duration = sum(t[2] for t in self.access_times) / len(self.access_times)
            else:
                avg_duration = 0
            
            stats = {
                'total_requests': total_requests,
                'hit_count': self.hit_count,
                'miss_count': self.miss_count,
                'hit_rate': hit_rate,
                'recent_hit_rate': recent_hit_rate,
                'avg_response_time': avg_duration,
                'cache_size': self.get_cache_size(),
                'memory_usage': self.get_memory_usage(),
            }
            
            # 添加到历史记录
            self.stats_history.append({
                'timestamp': datetime.now(),
                'stats': stats.copy()
            })
            
            return stats
    
    def get_cache_size(self):
        """获取缓存大小"""
        try:
            # 如果使用Redis
            if hasattr(settings, 'CACHES') and 'default' in settings.CACHES:
                cache_config = settings.CACHES['default']
                if cache_config['BACKEND'] == 'django.core.cache.backends.redis.RedisCache':
                    redis_client = redis.Redis.from_url(cache_config['LOCATION'])
                    return redis_client.dbsize()
        except:
            pass
        return 0
    
    def get_memory_usage(self):
        """获取内存使用情况"""
        try:
            process = psutil.Process()
            return process.memory_info().rss
        except:
            return 0
    
    def get_trend_analysis(self, minutes=10):
        """获取趋势分析"""
        with self.lock:
            cutoff_time = time.time() - (minutes * 60)
            recent_accesses = [
                t for t in self.access_times 
                if t[1] > cutoff_time
            ]
            
            if not recent_accesses:
                return {}
            
            hits = sum(1 for t in recent_accesses if t[0] == 'hit')
            misses = len(recent_accesses) - hits
            
            return {
                'period': f'last_{minutes}_minutes',
                'requests': len(recent_accesses),
                'hit_rate': hits / len(recent_accesses) if recent_accesses else 0,
                'requests_per_minute': len(recent_accesses) / minutes
            }

# 全局监控实例
cache_monitor = CacheMonitor()

# 监控装饰器
def monitored_cache(timeout=300):
    """监控缓存装饰器"""
    def decorator(func):
        def wrapper(*args, **kwargs):
            start_time = time.time()
            
            cache_key = f"monitored:{func.__name__}:{hash(str(args) + str(kwargs))}"
            result = cache.get(cache_key)
            
            end_time = time.time()
            duration = end_time - start_time
            
            if result is None:
                # 缓存未命中
                result = func(*args, **kwargs)
                cache.set(cache_key, result, timeout)
                cache_monitor.record_miss(duration)
            else:
                # 缓存命中
                cache_monitor.record_hit(duration)
            
            return result
        return wrapper
    return decorator

# 缓存诊断工具
class CacheDiagnostic:
    """缓存诊断工具"""
    
    @staticmethod
    def diagnose_performance():
        """诊断缓存性能"""
        stats = cache_monitor.get_stats()
        
        diagnosis = {
            'overall_health': 'good',
            'recommendations': [],
            'warnings': []
        }
        
        # 检查命中率
        if stats['hit_rate'] < 0.7:
            diagnosis['overall_health'] = 'poor'
            diagnosis['warnings'].append(f"缓存命中率过低: {stats['hit_rate']:.2%}")
            diagnosis['recommendations'].append("考虑增加缓存时间或调整缓存策略")
        
        elif stats['hit_rate'] < 0.85:
            diagnosis['overall_health'] = 'fair'
            diagnosis['recommendations'].append("缓存命中率可以进一步优化")
        
        # 检查响应时间
        if stats['avg_response_time'] > 0.1:  # 100ms
            diagnosis['warnings'].append(f"平均响应时间过长: {stats['avg_response_time']:.3f}s")
            diagnosis['recommendations'].append("检查缓存后端性能")
        
        # 检查缓存大小
        if stats['cache_size'] == 0:
            diagnosis['warnings'].append("无法获取缓存大小信息")
        
        return diagnosis
    
    @staticmethod
    def analyze_hot_keys(sample_size=1000):
        """分析热点键"""
        try:
            if hasattr(settings, 'CACHES') and 'default' in settings.CACHES:
                cache_config = settings.CACHES['default']
                if cache_config['BACKEND'] == 'django.core.cache.backends.redis.RedisCache':
                    redis_client = redis.Redis.from_url(cache_config['LOCATION'])
                    
                    # 获取所有键(生产环境需谨慎使用)
                    all_keys = redis_client.keys('*')
                    
                    # 分析键的使用频率(需要Redis配置相关查询)
                    hot_keys = []
                    for key in all_keys[:sample_size]:  # 只分析样本
                        try:
                            size = redis_client.memory_usage(key)
                            ttl = redis_client.ttl(key)
                            hot_keys.append({
                                'key': key.decode() if isinstance(key, bytes) else key,
                                'size': size,
                                'ttl': ttl
                            })
                        
                        except redis.ResponseError:
                            # 某些Redis版本可能不支持memory_usage
                            hot_keys.append({
                                'key': key.decode() if isinstance(key, bytes) else key,
                                'size': 'unknown',
                                'ttl': redis_client.ttl(key)
                            })
                    
                    # 按大小排序
                    hot_keys.sort(key=lambda x: x['size'] if isinstance(x['size'], int) else 0, reverse=True)
                    return hot_keys[:50]  # 返回最大的50个键
        
        except Exception as e:
            print(f"分析热点键失败: {e}")
        
        return []
    
    @staticmethod
    def get_cache_efficiency_report():
        """获取缓存效率报告"""
        stats = cache_monitor.get_stats()
        trend = cache_monitor.get_trend_analysis(minutes=5)
        
        report = {
            'timestamp': datetime.now().isoformat(),
            'current_stats': stats,
            'trend_analysis': trend,
            'diagnosis': CacheDiagnostic.diagnose_performance(),
            'suggestions': CacheDiagnostic.get_improvement_suggestions(stats)
        }
        
        return report
    
    @staticmethod
    def get_improvement_suggestions(stats):
        """获取改进建议"""
        suggestions = []
        
        if stats['hit_rate'] < 0.8:
            suggestions.append("考虑增加常用数据的缓存时间")
            suggestions.append("分析缓存未命中的原因,优化缓存键策略")
        
        if stats['avg_response_time'] > 0.05:  # 50ms
            suggestions.append("考虑使用更快的缓存后端(如内存缓存)")
            suggestions.append("检查网络延迟,考虑使用本地缓存")
        
        if stats['total_requests'] > 10000 and stats['hit_rate'] < 0.9:
            suggestions.append("对于高频访问的数据,考虑使用多级缓存策略")
        
        return suggestions

# 缓存性能测试
class CachePerformanceTester:
    """缓存性能测试器"""
    
    @staticmethod
    def test_cache_speed(iterations=1000):
        """测试缓存速度"""
        import random
        import string
        
        def random_string(length=10):
            return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
        
        results = {
            'set_times': [],
            'get_times': [],
            'hit_rates': []
        }
        
        for i in range(iterations):
            key = f"test_key_{random_string()}"
            value = random_string(100)
            
            # 测试设置时间
            start = time.time()
            cache.set(key, value, 300)
            set_time = time.time() - start
            results['set_times'].append(set_time)
            
            # 测试获取时间
            start = time.time()
            retrieved = cache.get(key)
            get_time = time.time() - start
            results['get_times'].append(get_time)
        
        # 计算统计信息
        avg_set_time = sum(results['set_times']) / len(results['set_times'])
        avg_get_time = sum(results['get_times']) / len(results['get_times'])
        
        performance_stats = {
            'iterations': iterations,
            'avg_set_time': avg_set_time,
            'avg_get_time': avg_get_time,
            'total_test_time': sum(results['set_times']) + sum(results['get_times']),
            'sets_per_second': iterations / sum(results['set_times']) if sum(results['set_times']) > 0 else float('inf'),
            'gets_per_second': iterations / sum(results['get_times']) if sum(results['get_times']) > 0 else float('inf'),
        }
        
        return performance_stats

# 使用示例
def cache_monitoring_example():
    """缓存监控示例"""
    
    @monitored_cache(timeout=600)
    def expensive_function(x, y):
        """昂贵的函数(带监控)"""
        time.sleep(0.1)  # 模拟耗时操作
        return x * y
    
    # 执行一些操作来产生监控数据
    for i in range(100):
        expensive_function(i, i + 1)
    
    # 获取统计信息
    stats = cache_monitor.get_stats()
    print(f"命中率: {stats['hit_rate']:.2%}")
    print(f"平均响应时间: {stats['avg_response_time']:.3f}s")
    
    # 获取诊断报告
    diagnostic = CacheDiagnostic.get_cache_efficiency_report()
    print(f"健康状况: {diagnostic['diagnosis']['overall_health']}")
    
    return stats

常见问题与解决方案

问题1:缓存雪崩

症状:大量缓存同时过期,导致数据库压力骤增

解决方案

# 1. 使用随机过期时间
def set_with_jitter(key, value, base_timeout=300, jitter_range=60):
    """设置带抖动的缓存"""
    import random
    actual_timeout = base_timeout + random.randint(-jitter_range, jitter_range)
    cache.set(key, value, actual_timeout)

# 2. 使用永不过期的缓存 + 后台更新
def set_persistent_cache(key, value, refresh_func, interval=300):
    """设置持久缓存"""
    cache.set(key, {
        'value': value,
        'refresh_time': time.time() + interval
    }, timeout=None)  # 永不过期

def get_with_background_refresh(key, refresh_func, interval=300):
    """获取带后台刷新的缓存"""
    data = cache.get(key)
    
    if data is None:
        # 缓存不存在,首次加载
        value = refresh_func()
        set_persistent_cache(key, value, refresh_func, interval)
        return value
    
    # 检查是否需要刷新
    if time.time() > data['refresh_time']:
        # 在后台线程中刷新
        import threading
        thread = threading.Thread(
            target=lambda: _refresh_cache_background(key, refresh_func, interval)
        )
        thread.daemon = True
        thread.start()
    
    return data['value']

def _refresh_cache_background(key, refresh_func, interval):
    """后台刷新缓存"""
    try:
        new_value = refresh_func()
        cache.set(key, {
            'value': new_value,
            'refresh_time': time.time() + interval
        }, timeout=None)
    except Exception as e:
        import logging
        logging.error(f"Background cache refresh failed: {e}")

# 3. 分级缓存策略
class TieredCache:
    """分级缓存"""
    
    def __init__(self):
        from django.core.cache import caches
        self.short_term = caches['default']  # 短期缓存
        self.long_term = caches['temporary']  # 长期缓存
    
    def get(self, key):
        """获取数据(多级缓存)"""
        # 先查短期缓存
        value = self.short_term.get(key)
        if value is not None:
            return value
        
        # 再查长期缓存
        value = self.long_term.get(key)
        if value is not None:
            # 同步到短期缓存
            self.short_term.set(key, value, timeout=300)
            return value
        
        return None
    
    def set(self, key, value, short_timeout=300, long_timeout=3600):
        """设置数据(多级缓存)"""
        self.short_term.set(key, value, timeout=short_timeout)
        self.long_term.set(key, value, timeout=long_timeout)

问题2:缓存穿透

症状:查询不存在的数据,缓存中也没有,导致每次查询都打到数据库

解决方案

# 1. 缓存空值
def get_with_null_cache(model_class, pk, timeout=300):
    """带空值缓存的获取"""
    cache_key = f"model:{model_class._meta.label}:{pk}"
    
    result = cache.get(cache_key)
    if result is not None:
        if result == "__null__":
            return None
        return result
    
    try:
        instance = model_class.objects.get(pk=pk)
        cache.set(cache_key, instance, timeout)
        return instance
    except model_class.DoesNotExist:
        # 缓存空值,避免频繁查询数据库
        cache.set(cache_key, "__null__", timeout=60)  # 空值只缓存1分钟
        return None

# 2. 布隆过滤器(简化版)
class BloomFilterCache:
    """布隆过滤器缓存(简化版)"""
    
    def __init__(self, capacity=10000, error_rate=0.1):
        import math
        self.capacity = capacity
        self.error_rate = error_rate
        
        # 计算位数组大小和哈希函数数量
        self.bit_array_size = self._get_size(capacity, error_rate)
        self.hash_count = self._get_hash_count(self.bit_array_size, capacity)
        
        # 初始化位数组
        self.bit_array = [0] * self.bit_array_size
    
    def _get_size(self, n, p):
        """计算位数组大小"""
        import math
        m = -(n * math.log(p)) / (math.log(2) ** 2)
        return int(m)
    
    def _get_hash_count(self, m, n):
        """计算哈希函数数量"""
        import math
        k = (m / n) * math.log(2)
        return int(k)
    
    def _hash(self, item, seed):
        """哈希函数"""
        import hashlib
        hash_obj = hashlib.md5((str(item) + str(seed)).encode())
        return int(hash_obj.hexdigest(), 16) % self.bit_array_size
    
    def add(self, item):
        """添加元素"""
        for i in range(self.hash_count):
            index = self._hash(item, i)
            self.bit_array[index] = 1
    
    def contains(self, item):
        """检查元素是否存在"""
        for i in range(self.hash_count):
            index = self._hash(item, i)
            if self.bit_array[index] == 0:
                return False
        return True

# 全局布隆过滤器实例
bloom_filter = BloomFilterCache()

def safe_get_with_bloom_filter(model_class, pk):
    """使用布隆过滤器的安全获取"""
    # 先检查布隆过滤器
    bloom_key = f"{model_class._meta.label}:{pk}"
    if not bloom_filter.contains(bloom_key):
        # 布隆过滤器说不存在,直接返回None
        return None
    
    # 布隆过滤器说可能存在,再查缓存
    cache_key = f"bloom:{bloom_key}"
    result = cache.get(cache_key)
    
    if result is not None:
        if result == "__null__":
            return None
        return result
    
    # 查数据库
    try:
        instance = model_class.objects.get(pk=pk)
        cache.set(cache_key, instance, 300)
        bloom_filter.add(bloom_key)  # 添加到布隆过滤器
        return instance
    except model_class.DoesNotExist:
        cache.set(cache_key, "__null__", 60)
        return None

问题3:缓存击穿

症状:热点数据过期时,大量并发请求同时查询数据库

解决方案

import threading

class LockingCache:
    """带锁的缓存"""
    
    def __init__(self):
        self.locks = {}
        self.lock = threading.Lock()
    
    def get_with_mutex(self, key, fetch_func, timeout=300):
        """带互斥锁的获取"""
        # 获取针对特定key的锁
        with self.lock:
            if key not in self.locks:
                self.locks[key] = threading.Lock()
            key_lock = self.locks[key]
        
        with key_lock:
            # 双重检查
            result = cache.get(key)
            if result is not None:
                if result == "__null__":
                    return None
                return result
            
            # 执行获取函数
            result = fetch_func()
            
            # 设置缓存
            cache_result = result if result is not None else "__null__"
            cache.set(key, cache_result, timeout)
            
            return result

# 全局锁缓存实例
locking_cache = LockingCache()

def get_with_mutex_protection(model_class, pk):
    """带锁保护的获取"""
    def fetch_func():
        try:
            return model_class.objects.get(pk=pk)
        except model_class.DoesNotExist:
            return None
    
    return locking_cache.get_with_mutex(
        f"mutex:{model_class._meta.label}:{pk}",
        fetch_func,
        timeout=300
    )

# 2. 读写分离缓存
class ReadWriteSeparatedCache:
    """读写分离缓存"""
    
    def __init__(self):
        from django.core.cache import caches
        self.read_cache = caches['default']  # 读缓存
        self.write_cache = caches['default']  # 写缓存(可以是不同的后端)
        self.update_locks = {}
        self.global_lock = threading.Lock()
    
    def get(self, key):
        """读取操作"""
        return self.read_cache.get(key)
    
    def set(self, key, value, timeout=300):
        """写入操作"""
        self.write_cache.set(key, value, timeout)
        # 同时更新读缓存
        self.read_cache.set(key, value, timeout)
    
    def update_with_protection(self, key, update_func, timeout=300):
        """受保护的更新操作"""
        with self.global_lock:
            if key not in self.update_locks:
                self.update_locks[key] = threading.Lock()
            update_lock = self.update_locks[key]
        
        with update_lock:
            # 获取当前值
            current_value = self.get(key)
            
            # 执行更新函数
            new_value = update_func(current_value)
            
            # 设置新值
            self.set(key, new_value, timeout)
            
            return new_value

# 使用示例
rw_cache = ReadWriteSeparatedCache()

def increment_counter_safely(counter_key):
    """安全的计数器递增"""
    def update_counter(current_value):
        current_value = current_value or 0
        return current_value + 1
    
    return rw_cache.update_with_protection(counter_key, update_counter, 3600)

问题4:内存泄漏

症状:缓存占用内存持续增长,导致内存不足

解决方案

# 1. 使用LRU缓存
from functools import lru_cache

@lru_cache(maxsize=1000)  # 最多缓存1000个结果
def lru_cached_function(param):
    """LRU缓存函数"""
    # 执行昂贵的操作
    time.sleep(0.01)
    return f"result for {param}"

# 2. 自定义容量限制缓存
class CapacityLimitedCache:
    """容量限制缓存"""
    
    def __init__(self, max_size=1000):
        self.max_size = max_size
        self.cache = {}
        self.access_order = []  # 记录访问顺序
        self.lock = threading.RLock()
    
    def get(self, key, default=None):
        """获取值"""
        with self.lock:
            if key in self.cache:
                # 更新访问顺序
                if key in self.access_order:
                    self.access_order.remove(key)
                self.access_order.append(key)
                return self.cache[key]
            return default
    
    def set(self, key, value, timeout=None):
        """设置值"""
        with self.lock:
            if key in self.cache:
                # 更新现有键
                self.cache[key] = value
                if key in self.access_order:
                    self.access_order.remove(key)
                self.access_order.append(key)
            else:
                # 添加新键
                if len(self.cache) >= self.max_size:
                    # 移除最久未使用的键
                    oldest_key = self.access_order.pop(0)
                    del self.cache[oldest_key]
                
                self.cache[key] = value
                self.access_order.append(key)
    
    def evict_least_recent(self):
        """手动驱逐最久未使用的项"""
        with self.lock:
            if self.access_order:
                oldest_key = self.access_order.pop(0)
                if oldest_key in self.cache:
                    del self.cache[oldest_key]

# 3. 定期清理过期缓存
class SelfCleaningCache:
    """自清理缓存"""
    
    def __init__(self, cleanup_interval=3600):  # 1小时清理一次
        self.cleanup_interval = cleanup_interval
        self.last_cleanup = time.time()
        self.lock = threading.Lock()
    
    def cleanup_expired(self):
        """清理过期缓存"""
        with self.lock:
            if time.time() - self.last_cleanup > self.cleanup_interval:
                # 这里需要根据具体的缓存后端实现清理逻辑
                # 对于Redis,可以使用TTL检查
                try:
                    if hasattr(settings, 'CACHES') and 'default' in settings.CACHES:
                        cache_config = settings.CACHES['default']
                        if cache_config['BACKEND'] == 'django.core.cache.backends.redis.RedisCache':
                            import redis
                            redis_client = redis.Redis.from_url(cache_config['LOCATION'])
                            
                            # 查找并清理过期键
                            all_keys = redis_client.keys('*')
                            for key in all_keys:
                                if redis_client.ttl(key) < 0:  # 已过期
                                    redis_client.delete(key)
                
                except Exception as e:
                    import logging
                    logging.error(f"Cache cleanup failed: {e}")
                
                self.last_cleanup = time.time()

# 全局自清理缓存实例
self_cleaning_cache = SelfCleaningCache()

# 在应用启动时启动清理线程
def start_cache_cleanup_thread():
    """启动缓存清理线程"""
    import threading
    
    def cleanup_worker():
        while True:
            try:
                time.sleep(3600)  # 每小时检查一次
                self_cleaning_cache.cleanup_expired()
            except Exception as e:
                import logging
                logging.error(f"Cache cleanup worker error: {e}")
    
    thread = threading.Thread(target=cleanup_worker, daemon=True)
    thread.start()

# 启动清理线程
start_cache_cleanup_thread()

本章小结

在本章中,我们深入学习了Django缓存策略:

  1. 缓存基础概念:理解了缓存的工作原理和重要性
  2. 缓存架构:掌握了Django缓存系统的整体架构
  3. 缓存后端配置:学会了Redis、Memcached等缓存后端的配置
  4. 缓存类型与使用:了解了不同层级的缓存使用方法
  5. 缓存策略模式:学习了Cache-Aside、Read-Through等模式
  6. 性能优化:掌握了压缩、分片、预取等优化技术
  7. 监控诊断:学会了缓存监控和性能分析方法
  8. 问题解决方案:了解了雪崩、穿透、击穿等问题的解决方法

核心要点回顾

"""
本章核心要点:

1. 缓存是提升应用性能的关键技术
2. 选择合适的缓存后端和策略很重要
3. 需要注意缓存一致性问题
4. 监控缓存性能是必要的
5. 合理设置缓存过期时间
6. 防范缓存雪崩、穿透、击穿问题
7. 定期清理过期缓存避免内存泄漏
8. 根据业务场景选择合适的缓存策略
"""

💡 核心要点:缓存策略需要根据具体的应用场景和性能需求来设计,合理的缓存策略能够显著提升应用性能,但也要注意避免缓存带来的复杂性和潜在问题。

SEO优化策略

  1. 关键词布局: 在标题、内容中合理布局"Django缓存", "缓存策略", "性能优化", "Redis缓存", "缓存层级"等关键词
  2. 内容结构: 使用清晰的标题层级(H1-H3),便于搜索引擎理解内容结构
  3. 内部链接: 建立与其他相关教程的内部链接,提升页面权重
  4. 元数据优化: 在页面头部包含描述性的标题、描述和标签

🏷️ 标签云: Django缓存 缓存策略 性能优化 Redis缓存 缓存层级 缓存模式 缓存监控 缓存优化 缓存安全