#Django缓存策略 - 高性能应用优化指南
📂 所属阶段:第二部分 — 进阶特性
🎯 难度等级:中级
⏰ 预计学习时间:4-5小时
#目录
#缓存基础概念
缓存是提高Web应用性能的关键技术,通过存储计算结果或数据副本,避免重复计算和数据库查询。
#缓存原理
"""
缓存工作原理:
1. 请求到达时,首先检查缓存中是否存在所需数据
2. 如果存在(缓存命中),直接返回缓存数据
3. 如果不存在(缓存未命中),执行计算/查询操作
4. 将结果存储到缓存中,下次请求可直接使用
5. 缓存数据有过期时间,过期后重新生成
缓存优势:
- 减少数据库负载
- 降低响应时间
- 提高系统吞吐量
- 改善用户体验
缓存挑战:
- 数据一致性问题
- 缓存雪崩、穿透、击穿
- 内存使用管理
- 缓存策略选择
"""#缓存层级
"""
Django缓存层级:
1. 数据库查询缓存
- QuerySet缓存
- select_related/prefetch_related优化
2. 模板片段缓存
- 模板标签缓存
- 片段缓存
3. 视图级别缓存
- 视图函数缓存
- 类视图缓存
4. 页面级别缓存
- 完整页面缓存
- 中间件缓存
5. 应用层缓存
- 自定义缓存逻辑
- 业务逻辑缓存
6. 代理层缓存
- Nginx缓存
- CDN缓存
"""#缓存策略类型
"""
常见的缓存策略:
1. Cache-Aside Pattern (旁路缓存)
- 应用负责从数据库加载数据到缓存
- 缓存未命中时,应用从数据库获取数据并存入缓存
2. Read-Through Pattern (读穿透)
- 缓存层透明地从数据库加载数据
- 应用只需与缓存交互
3. Write-Through Pattern (写穿透)
- 数据写入时同时写入缓存和数据库
- 保证数据一致性
4. Write-Behind Pattern (写回)
- 数据先写入缓存,后台异步写入数据库
- 提高性能,但可能丢失数据
"""#Django缓存架构
#缓存后端
# Django内置缓存后端
from django.core.cache import cache
from django.core.cache.backends.base import BaseCache
"""
Django支持的缓存后端:
1. DummyCache - 空缓存(用于开发环境)
2. LocMemCache - 本地内存缓存
3. FileBasedCache - 文件系统缓存
4. DatabaseCache - 数据库缓存
5. MemcachedCache - Memcached缓存
6. PyLibMCCache - PyLibMC缓存
7. RedisCache - Redis缓存(需要第三方包)
选择原则:
- 开发环境:LocMemCache
- 小型应用:LocMemCache或FileBasedCache
- 生产环境:Redis或Memcached
- 分布式应用:Redis或Memcached
"""#缓存API
# Django缓存API使用
from django.core.cache import cache
import time
# 基本操作
def cache_operations_demo():
"""缓存操作演示"""
# 设置缓存
cache.set('key', 'value', timeout=300) # 5分钟后过期
# 获取缓存
value = cache.get('key')
if value is None:
# 缓存未命中,从数据库获取
value = expensive_database_query()
cache.set('key', value, timeout=300)
# 获取缓存或设置默认值
value = cache.get_or_set('key2', lambda: expensive_function(), timeout=60)
# 批量操作
cache.set_many({'a': 1, 'b': 2, 'c': 3}, timeout=300)
values = cache.get_many(['a', 'b', 'c'])
# 删除操作
cache.delete('key')
cache.delete_many(['a', 'b', 'c'])
cache.clear() # 清空所有缓存
return value
def expensive_database_query():
"""模拟昂贵的数据库查询"""
time.sleep(1) # 模拟耗时操作
return "expensive result"
def expensive_function():
"""模拟昂贵的函数调用"""
time.sleep(0.5)
return "expensive function result"
# 缓存键生成
def generate_cache_key(*args, **kwargs):
"""生成缓存键"""
import hashlib
import json
# 将参数转换为字符串
key_data = {
'args': args,
'kwargs': kwargs
}
# 生成哈希
key_str = json.dumps(key_data, sort_keys=True)
hash_key = hashlib.md5(key_str.encode()).hexdigest()
return f"computed:{hash_key}"
# 带参数的缓存函数
def cached_function_with_params(user_id, category):
"""带参数的缓存函数"""
cache_key = f"user_profile:{user_id}:{category}"
result = cache.get(cache_key)
if result is None:
# 从数据库获取数据
result = get_user_data(user_id, category)
cache.set(cache_key, result, timeout=3600) # 1小时
return result
def get_user_data(user_id, category):
"""获取用户数据"""
# 模拟数据库查询
time.sleep(0.1)
return {"user_id": user_id, "category": category, "data": "user data"}#缓存装饰器
# 缓存装饰器
from django.core.cache import cache
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
from django.views.generic import ListView
import functools
def cached_function(timeout=300):
"""函数缓存装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"func:{func.__name__}:{hash(str(args) + str(kwargs))}"
# 尝试从缓存获取
result = cache.get(cache_key)
if result is None:
# 执行函数并缓存结果
result = func(*args, **kwargs)
cache.set(cache_key, result, timeout=timeout)
return result
return wrapper
return decorator
# 使用示例
@cached_function(timeout=600)
def calculate_expensive_operation(x, y):
"""昂贵的计算操作"""
time.sleep(2) # 模拟耗时计算
return x * y + x ** 2 + y ** 2
# 类方法缓存装饰器
def cached_method(timeout=300):
"""方法缓存装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
# 包含实例信息的缓存键
cache_key = f"method:{self.__class__.__name__}:{func.__name__}:{id(self)}:{hash(str(args) + str(kwargs))}"
result = cache.get(cache_key)
if result is None:
result = func(self, *args, **kwargs)
cache.set(cache_key, result, timeout=timeout)
return result
return wrapper
return decorator
class DataProcessor:
"""数据处理器"""
def __init__(self, processor_id):
self.processor_id = processor_id
@cached_method(timeout=1800)
def process_data(self, data):
"""处理数据"""
time.sleep(1) # 模拟处理时间
return f"Processed {len(data)} items for processor {self.processor_id}"
# 高级缓存装饰器
def smart_cache(timeout=300, key_prefix='', version=1):
"""智能缓存装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 生成更智能的缓存键
key_parts = [key_prefix, func.__module__, func.__name__]
# 添加参数到键中
if args:
key_parts.extend([str(arg) for arg in args])
if kwargs:
# 按键排序确保一致性
sorted_kwargs = sorted(kwargs.items())
key_parts.append(str(sorted_kwargs))
cache_key = ":".join(key_parts)
cache_key = f"{cache_key}:v{version}"
# 尝试获取缓存
result = cache.get(cache_key)
if result is None:
result = func(*args, **kwargs)
cache.set(cache_key, result, timeout=timeout)
return result
return wrapper
return decorator
# 使用示例
@smart_cache(timeout=3600, key_prefix='stats', version=2)
def get_user_statistics(user_id, period='daily'):
"""获取用户统计数据"""
# 模拟复杂的数据聚合
time.sleep(1)
return {
'user_id': user_id,
'period': period,
'views': 100,
'actions': 50
}#缓存后端配置
#settings.py 配置
# settings.py - 缓存配置
"""
缓存配置示例
"""
import os
# 基础缓存配置
CACHES = {
# 默认缓存
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': [
'redis://127.0.0.1:6379/1',
'redis://127.0.0.1:6380/1', # 支持多个Redis实例
],
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 20,
'retry_on_timeout': True,
},
'COMPRESSOR': 'django_redis.compressors.zlib.ZlibCompressor',
'SERIALIZER': 'django_redis.serializers.json.JSONSerializer',
},
'KEY_PREFIX': 'myapp',
'TIMEOUT': 300, # 5分钟默认超时
'VERSION': 1,
},
# 会话缓存
'sessions': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/2',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
},
'KEY_PREFIX': 'session',
'TIMEOUT': 1209600, # 2周
},
# 临时缓存
'temporary': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': 'temp-cache',
'OPTIONS': {
'MAX_ENTRIES': 1000,
},
'TIMEOUT': 60, # 1分钟
},
}
# 开发环境配置
if os.environ.get('DJANGO_ENV') == 'development':
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': 'unique-snowflake',
'OPTIONS': {
'MAX_ENTRIES': 1000,
}
}
}
# 生产环境配置
if os.environ.get('DJANGO_ENV') == 'production':
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': os.environ.get('REDIS_URL', 'redis://127.0.0.1:6379/1'),
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 50,
'retry_on_timeout': True,
'socket_keepalive': True,
'socket_keepalive_options': {True},
},
},
'KEY_PREFIX': os.environ.get('CACHE_KEY_PREFIX', 'prod'),
'TIMEOUT': 300,
}
}
# 缓存相关设置
CACHE_MIDDLEWARE_ALIAS = 'default' # 中间件使用的缓存别名
CACHE_MIDDLEWARE_SECONDS = 600 # 页面缓存时间
CACHE_MIDDLEWARE_KEY_PREFIX = '' # 页面缓存键前缀
# 会话使用缓存
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'sessions'#Redis缓存配置
# Redis缓存高级配置
"""
pip install django-redis redis hiredis
"""
import redis
from django.conf import settings
class RedisCacheManager:
"""Redis缓存管理器"""
def __init__(self):
self.redis_client = redis.Redis(
host=getattr(settings, 'REDIS_HOST', 'localhost'),
port=getattr(settings, 'REDIS_PORT', 6379),
db=getattr(settings, 'REDIS_DB', 0),
password=getattr(settings, 'REDIS_PASSWORD', None),
decode_responses=False, # 保持字节模式
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
)
def get_connection_pool(self):
"""获取连接池"""
pool = redis.ConnectionPool(
host=getattr(settings, 'REDIS_HOST', 'localhost'),
port=getattr(settings, 'REDIS_PORT', 6379),
db=getattr(settings, 'REDIS_DB', 0),
password=getattr(settings, 'REDIS_PASSWORD', None),
max_connections=getattr(settings, 'REDIS_MAX_CONNECTIONS', 20),
retry_on_timeout=True,
)
return redis.Redis(connection_pool=pool)
def get_pipeline(self):
"""获取管道操作"""
return self.redis_client.pipeline()
def batch_set(self, data_dict, expire_time=300):
"""批量设置"""
pipe = self.get_pipeline()
for key, value in data_dict.items():
pipe.setex(key, expire_time, value)
pipe.execute()
def batch_get(self, keys):
"""批量获取"""
return self.redis_client.mget(keys)
def atomic_increment(self, key, amount=1, expire_time=300):
"""原子递增操作"""
value = self.redis_client.incrby(key, amount)
# 设置过期时间
self.redis_client.expire(key, expire_time)
return value
def get_with_lock(self, key, compute_func, lock_timeout=10, cache_timeout=300):
"""带锁的缓存获取"""
lock_key = f"lock:{key}"
# 尝试获取锁
if self.redis_client.set(lock_key, "1", nx=True, ex=lock_timeout):
try:
# 计算并设置缓存
result = compute_func()
self.redis_client.setex(key, cache_timeout, result)
return result
finally:
# 释放锁
self.redis_client.delete(lock_key)
else:
# 等待锁释放后返回缓存值
import time
start_time = time.time()
while time.time() - start_time < lock_timeout:
result = self.redis_client.get(key)
if result is not None:
return result
time.sleep(0.1)
# 超时后执行计算
result = compute_func()
self.redis_client.setex(key, cache_timeout, result)
return result
# Redis缓存配置类
class RedisCacheConfig:
"""Redis缓存配置类"""
@staticmethod
def get_production_config():
"""生产环境Redis配置"""
return {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': [
'redis://master.redis.example.com:6379/1',
'redis://slave.redis.example.com:6379/1',
],
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.ShardClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 100,
'retry_on_timeout': True,
'health_check_interval': 30,
'socket_keepalive': True,
},
'COMPRESSOR': 'django_redis.compressors.zlib.ZlibCompressor',
'SERIALIZER': 'django_redis.serializers.pickle.PickleSerializer',
},
'KEY_PREFIX': 'prod_app',
'TIMEOUT': 900, # 15分钟
'VERSION': 1,
}
@staticmethod
def get_session_config():
"""会话Redis配置"""
return {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://session.redis.example.com:6379/2',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 50,
},
},
'KEY_PREFIX': 'session',
'TIMEOUT': 1209600, # 2周
'VERSION': 1,
}
# 缓存健康检查
class CacheHealthChecker:
"""缓存健康检查器"""
def __init__(self):
from django.core.cache import caches
self.cache = caches['default']
def check_health(self):
"""检查缓存健康状态"""
try:
# 测试缓存连接
test_key = 'health_check:test'
test_value = 'healthy'
# 设置和获取测试值
self.cache.set(test_key, test_value, timeout=10)
retrieved_value = self.cache.get(test_key)
# 验证结果
if retrieved_value == test_value:
return {
'status': 'healthy',
'message': 'Cache is accessible and responsive',
'response_time': self._measure_response_time()
}
else:
return {
'status': 'unhealthy',
'message': 'Cache returned unexpected value',
'expected': test_value,
'received': retrieved_value
}
except Exception as e:
return {
'status': 'unhealthy',
'message': f'Cache error: {str(e)}',
'error_type': type(e).__name__
}
def _measure_response_time(self):
"""测量响应时间"""
import time
start_time = time.time()
self.cache.get('health_check:test')
return time.time() - start_time#Memcached配置
# Memcached配置
"""
pip install python-memcached
或
pip install pylibmc
"""
# Memcached缓存配置
MEMCACHED_CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.memcached.PyLibMCCache',
'LOCATION': [
'127.0.0.1:11211',
'127.0.0.1:11212',
],
'OPTIONS': {
'binary': True,
'behaviors': {
'tcp_nodelay': True,
'ketama': True,
'remove_failed': 4,
'retry_timeout': 2,
'dead_timeout': 10,
}
},
'KEY_PREFIX': 'myapp',
'TIMEOUT': 300,
}
}
# Memcached管理类
class MemcachedManager:
"""Memcached管理器"""
def __init__(self):
import memcache
self.mc = memcache.Client(
['127.0.0.1:11211', '127.0.0.1:11212'],
debug=0
)
def get_multi(self, keys):
"""批量获取"""
return self.mc.get_multi(keys)
def set_multi(self, mapping, time=2592000): # 30天
"""批量设置"""
return self.mc.set_multi(mapping, time=time)
def add(self, key, val, time=2592000):
"""仅在键不存在时设置"""
return self.mc.add(key, val, time=time)
def incr(self, key, delta=1):
"""递增操作"""
return self.mc.incr(key, delta)
def decr(self, key, delta=1):
"""递减操作"""
return self.mc.decr(key, delta)
# Memcached缓存装饰器
def memcached_cached(timeout=300, key_prefix=''):
"""Memcached缓存装饰器"""
def decorator(func):
import hashlib
import pickle
def wrapper(*args, **kwargs):
# 生成缓存键
key_data = f"{key_prefix}:{func.__name__}:{args}:{sorted(kwargs.items())}"
cache_key = hashlib.md5(key_data.encode()).hexdigest()
# 尝试从Memcached获取
cached_result = cache.get(cache_key)
if cached_result is None:
# 执行函数
result = func(*args, **kwargs)
# 存储到缓存
cache.set(cache_key, result, timeout=timeout)
return result
return cached_result
return wrapper
return decorator#缓存类型与使用
#低层级缓存
# 低层级缓存 - 缓存单个值
from django.core.cache import cache
def low_level_cache_example():
"""低层级缓存示例"""
# 基本缓存操作
cache.set('simple_key', 'simple_value', 300) # 5分钟过期
value = cache.get('simple_key')
# 带默认值的获取
value = cache.get('nonexistent_key', 'default_value')
# 仅当键不存在时设置
cache.add('new_key', 'new_value', 600) # 10分钟
# 获取或计算
expensive_result = cache.get_or_set(
'expensive_calculation',
lambda: perform_expensive_calculation(),
1800 # 30分钟
)
# 批量操作
cache.set_many({
'key1': 'value1',
'key2': 'value2',
'key3': 'value3'
}, 600)
values = cache.get_many(['key1', 'key2', 'key3'])
return values
def perform_expensive_calculation():
"""执行昂贵的计算"""
import time
time.sleep(2) # 模拟耗时操作
return sum(range(1000000))
# 缓存键命名规范
class CacheKeyGenerator:
"""缓存键生成器"""
@staticmethod
def generate_user_key(user_id, suffix=''):
"""生成用户相关缓存键"""
prefix = 'user'
return f"{prefix}:{user_id}:{suffix}" if suffix else f"{prefix}:{user_id}"
@staticmethod
def generate_model_key(model_name, obj_id, field=''):
"""生成模型对象缓存键"""
prefix = 'model'
if field:
return f"{prefix}:{model_name}:{obj_id}:{field}"
return f"{prefix}:{model_name}:{obj_id}"
@staticmethod
def generate_query_key(query_params, model_name):
"""生成查询缓存键"""
import hashlib
query_str = str(sorted(query_params.items())) if query_params else ''
hash_suffix = hashlib.md5(f"{model_name}:{query_str}".encode()).hexdigest()[:8]
return f"query:{model_name}:{hash_suffix}"
@staticmethod
def generate_fragment_key(template_name, fragment_name, context_hash=''):
"""生成模板片段缓存键"""
if context_hash:
return f"fragment:{template_name}:{fragment_name}:{context_hash}"
return f"fragment:{template_name}:{fragment_name}"
# 使用示例
def user_profile_cache_example(user_id):
"""用户资料缓存示例"""
from django.contrib.auth.models import User
# 生成缓存键
profile_key = CacheKeyGenerator.generate_user_key(user_id, 'profile')
permissions_key = CacheKeyGenerator.generate_user_key(user_id, 'permissions')
# 获取缓存的用户资料
profile_data = cache.get(profile_key)
if profile_data is None:
user = User.objects.get(id=user_id)
profile_data = {
'username': user.username,
'email': user.email,
'first_name': user.first_name,
'last_name': user.last_name,
}
cache.set(profile_key, profile_data, 3600) # 1小时
# 获取缓存的权限信息
permissions = cache.get(permissions_key)
if permissions is None:
user = User.objects.get(id=user_id)
permissions = list(user.user_permissions.values_list('codename', flat=True))
cache.set(permissions_key, permissions, 1800) # 30分钟
return {
'profile': profile_data,
'permissions': permissions
}#模板片段缓存
# 模板片段缓存
from django.core.cache import cache
from django.template import Library
from django.utils.cache import patch_cache_control
from django.views.decorators.cache import cache_page
register = Library()
@register.simple_tag(takes_context=True)
def cached_partial(context, template_name, timeout=300, **kwargs):
"""缓存的模板片段标签"""
from django.template.loader import render_to_string
import hashlib
# 生成缓存键
cache_key = f"partial:{template_name}:{hash(str(kwargs))}"
# 尝试从缓存获取
rendered = cache.get(cache_key)
if rendered is None:
# 渲染模板
rendered = render_to_string(template_name, {**context.flatten(), **kwargs})
# 存储到缓存
cache.set(cache_key, rendered, timeout)
return rendered
# 自定义缓存模板标签
from django import template
from django.template.base import Node, Variable, TemplateSyntaxError
import re
register = template.Library()
class CacheNode(Node):
"""缓存节点"""
def __init__(self, nodelist, expire_time_var, fragment_name, vary_on):
self.nodelist = nodelist
self.expire_time_var = Variable(expire_time_var)
self.fragment_name = fragment_name
self.vary_on = vary_on
def render(self, context):
"""渲染缓存节点"""
try:
expire_time = self.expire_time_var.resolve(context)
except template.VariableDoesNotExist:
expire_time = 0
# 生成缓存键
vary_values = [Variable(var).resolve(context) for var in self.vary_on]
cache_key = f"template.cache.{self.fragment_name}." + ".".join(str(v) for v in vary_values)
value = cache.get(cache_key)
if value is None:
value = self.nodelist.render(context)
cache.set(cache_key, value, expire_time)
return value
@register.tag('cache')
def do_cache(parser, token):
"""缓存模板标签"""
nodelist = parser.parse(('endcache',))
parser.delete_first_token()
tokens = token.contents.split()
if len(tokens) < 3:
raise TemplateSyntaxError("'%s' tag requires at least 2 arguments" % tokens[0])
return CacheNode(nodelist, tokens[1], tokens[2], tokens[3:])
# 模板中使用示例
"""
<!-- 在模板中使用 -->
{% load cache %}
{% cache 500 sidebar user.username %}
<!-- 侧边栏内容,缓存500秒 -->
<div class="sidebar">
<h3>欢迎, {{ user.username }}!</h3>
<p>这是侧边栏内容...</p>
</div>
{% endcache %}
{% cache 600 navigation request.user.is_staff %}
<!-- 导航菜单,根据用户角色变化 -->
<nav>
{% if request.user.is_staff %}
<a href="/admin/">管理面板</a>
{% endif %}
<a href="/dashboard/">仪表板</a>
</nav>
{% endcache %}
"""#视图级别缓存
# 视图级别缓存
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
from django.views.generic import ListView, DetailView
from django.views import View
from django.http import JsonResponse
from django.core.cache import cache
import json
# 函数视图缓存
@cache_page(60 * 15) # 缓存15分钟
def product_list_view(request):
"""商品列表视图(带缓存)"""
from .models import Product
products = Product.objects.filter(is_active=True).select_related('category')
return JsonResponse({
'products': [
{
'id': p.id,
'name': p.name,
'price': float(p.price),
'category': p.category.name if p.category else None
}
for p in products
]
})
@cache_page(60 * 30, cache='temporary') # 使用临时缓存后端
def dashboard_stats_view(request):
"""仪表板统计视图"""
from .models import Order, User
stats = {
'total_users': User.objects.count(),
'total_orders': Order.objects.count(),
'monthly_revenue': get_monthly_revenue(),
'active_sessions': get_active_sessions_count()
}
return JsonResponse(stats)
def get_monthly_revenue():
"""获取月收入"""
# 模拟计算
import random
return random.randint(10000, 50000)
def get_active_sessions_count():
"""获取活跃会话数"""
# 模拟获取
import random
return random.randint(50, 200)
# 类视图缓存
@method_decorator(cache_page(60 * 10), name='dispatch')
class ProductListView(ListView):
"""商品列表类视图"""
model = Product
template_name = 'products/list.html'
context_object_name = 'products'
def get_queryset(self):
return Product.objects.filter(is_active=True).select_related('category')
@method_decorator(cache_page(60 * 60), name='get') # 1小时缓存
class ProductDetailView(DetailView):
"""商品详情类视图"""
model = Product
template_name = 'products/detail.html'
context_object_name = 'product'
# 自定义缓存视图
class CachedAPIView(View):
"""带缓存的API视图"""
def get_cache_key(self, request):
"""生成缓存键"""
return f"api:{request.path}:{hash(request.GET.urlencode())}"
def get_cache_timeout(self):
"""获取缓存超时时间"""
return 300 # 5分钟
def get(self, request):
"""GET请求处理"""
cache_key = self.get_cache_key(request)
# 尝试从缓存获取
cached_response = cache.get(cache_key)
if cached_response:
return JsonResponse(cached_response)
# 生成响应
response_data = self.generate_response_data(request)
# 缓存响应
cache.set(cache_key, response_data, self.get_cache_timeout())
return JsonResponse(response_data)
def generate_response_data(self, request):
"""生成响应数据"""
# 子类需要实现此方法
raise NotImplementedError
# 特定条件缓存
def conditional_cache_page(timeout):
"""条件缓存装饰器"""
def decorator(view_func):
def wrapper(request, *args, **kwargs):
# 检查是否应该缓存
if should_cache_request(request):
# 使用标准缓存
from django.views.decorators.cache import cache_page
cached_view = cache_page(timeout)(view_func)
return cached_view(request, *args, **kwargs)
else:
# 不缓存,直接调用原函数
return view_func(request, *args, **kwargs)
return wrapper
return decorator
def should_cache_request(request):
"""判断是否应该缓存请求"""
# 不缓存POST请求
if request.method != 'GET':
return False
# 不缓存带认证信息的请求
if request.user.is_authenticated:
return False
# 不缓存带特定参数的请求
if 'nocache' in request.GET:
return False
# 不缓存管理员页面
if request.path.startswith('/admin/'):
return False
return True
# 使用条件缓存
@conditional_cache_page(60 * 15)
def conditional_cached_view(request):
"""条件缓存视图"""
return JsonResponse({'message': 'This might be cached'})#模型层面缓存
# 模型层面缓存
from django.db import models
from django.core.cache import cache
from django.conf import settings
import hashlib
from typing import Optional, List, Dict, Any
class CacheMixin:
"""缓存混入类"""
def get_cache_key(self, field: str = '', suffix: str = '') -> str:
"""生成模型实例缓存键"""
base_key = f"{self._meta.label}:{self.pk}"
if field:
base_key = f"{base_key}:{field}"
if suffix:
base_key = f"{base_key}:{suffix}"
return base_key
def get_cache_timeout(self) -> int:
"""获取缓存超时时间"""
return getattr(settings, 'MODEL_CACHE_TIMEOUT', 300)
def cache_get(self, field: str, default=None):
"""从缓存获取字段值"""
cache_key = self.get_cache_key(field)
return cache.get(cache_key, default)
def cache_set(self, field: str, value, timeout: Optional[int] = None):
"""设置字段缓存"""
cache_key = self.get_cache_key(field)
cache.set(cache_key, value, timeout or self.get_cache_timeout())
def cache_delete(self, field: str = ''):
"""删除字段缓存"""
if field:
cache_key = self.get_cache_key(field)
cache.delete(cache_key)
else:
# 删除所有相关缓存
base_pattern = self.get_cache_key('')
# 这里需要根据具体缓存后端实现模式匹配删除
# 对于Redis,可以使用KEYS命令(生产环境需谨慎使用)
def cache_property(self, func):
"""缓存属性装饰器"""
@property
def cached_prop(self):
cache_key = self.get_cache_key(func.__name__)
value = cache.get(cache_key)
if value is None:
value = func(self)
cache.set(cache_key, value, self.get_cache_timeout())
return value
return cached_prop
# 带缓存的用户模型
class CachedUser(models.Model, CacheMixin):
"""带缓存的用户模型"""
username = models.CharField(max_length=150, unique=True)
email = models.EmailField()
first_name = models.CharField(max_length=30)
last_name = models.CharField(max_length=150)
is_active = models.BooleanField(default=True)
date_joined = models.DateTimeField(auto_now_add=True)
def get_full_name(self):
"""获取全名"""
return f"{self.first_name} {self.last_name}".strip()
@CacheMixin.cache_property
def profile_summary(self):
"""缓存的个人资料摘要"""
# 模拟复杂计算
import time
time.sleep(0.1) # 模拟耗时操作
return {
'username': self.username,
'full_name': self.get_full_name(),
'email': self.email,
'account_age_days': (self.date_joined.now() - self.date_joined).days
}
def get_permissions(self):
"""获取用户权限(带缓存)"""
cache_key = self.get_cache_key('permissions')
permissions = cache.get(cache_key)
if permissions is None:
# 从数据库获取权限
permissions = list(self.user_permissions.values_list('codename', flat=True))
cache.set(cache_key, permissions, 300) # 5分钟
return permissions
def save(self, *args, **kwargs):
"""保存时清除相关缓存"""
# 保存前清除缓存
self.cache_delete('profile_summary')
self.cache_delete('permissions')
super().save(*args, **kwargs)
def delete(self, *args, **kwargs):
"""删除时清除所有缓存"""
# 清除所有相关缓存
self.cache_delete()
super().delete(*args, **kwargs)
# 查询集缓存
class CachedQuerySet(models.QuerySet):
"""带缓存的查询集"""
def cache_results(self, timeout=300, key_prefix='query'):
"""缓存查询结果"""
import hashlib
# 生成查询的哈希值作为缓存键
query_hash = hashlib.md5(str(self.query).encode()).hexdigest()
cache_key = f"{key_prefix}:{query_hash}"
results = cache.get(cache_key)
if results is None:
results = list(self)
cache.set(cache_key, results, timeout)
return results
def cache_count(self, timeout=300):
"""缓存计数结果"""
import hashlib
query_hash = hashlib.md5(str(self.query).encode()).hexdigest()
cache_key = f"count:{query_hash}"
count = cache.get(cache_key)
if count is None:
count = self.count()
cache.set(cache_key, count, timeout)
return count
class CachedManager(models.Manager):
"""带缓存的管理器"""
def get_queryset(self):
return CachedQuerySet(self.model, using=self._db)
def cached_filter(self, *args, **kwargs):
"""缓存过滤结果"""
import hashlib
# 生成缓存键
filter_str = str(sorted((args, tuple(sorted(kwargs.items())))))
cache_key = f"filter:{self.model._meta.label}:{hashlib.md5(filter_str.encode()).hexdigest()[:8]}"
results = cache.get(cache_key)
if results is None:
results = list(self.filter(*args, **kwargs))
cache.set(cache_key, results, 300) # 5分钟
return results
# 使用示例模型
class Product(models.Model):
"""商品模型"""
name = models.CharField(max_length=200)
price = models.DecimalField(max_digits=10, decimal_places=2)
category = models.ForeignKey('Category', on_delete=models.CASCADE)
description = models.TextField()
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
objects = CachedManager()
@CacheMixin.cache_property
def related_products(self):
"""相关商品"""
return list(Product.objects.filter(
category=self.category,
is_active=True
).exclude(id=self.id)[:10])
def get_price_with_tax(self):
"""含税价格(带缓存)"""
cache_key = self.get_cache_key('price_with_tax')
price_with_tax = cache.get(cache_key)
if price_with_tax is None:
tax_rate = 0.1 # 10%税率
price_with_tax = float(self.price) * (1 + tax_rate)
cache.set(cache_key, price_with_tax, 3600) # 1小时
return price_with_tax
class Category(models.Model):
"""分类模型"""
name = models.CharField(max_length=100)
slug = models.SlugField(unique=True)
objects = CachedManager()
@CacheMixin.cache_property
def active_products_count(self):
"""活跃商品数量"""
return self.product_set.filter(is_active=True).count()#缓存策略模式
#Cache-Aside模式
# Cache-Aside模式实现
class CacheAsidePattern:
"""Cache-Aside模式实现"""
@staticmethod
def get_data_with_cache(model_class, pk, timeout=300):
"""使用Cache-Aside模式获取数据"""
cache_key = f"model:{model_class._meta.label}:{pk}"
# 1. 先从缓存获取
data = cache.get(cache_key)
if data is not None:
# 缓存命中,直接返回
return data
# 2. 缓存未命中,从数据库获取
try:
instance = model_class.objects.get(pk=pk)
# 3. 将数据存入缓存
cache.set(cache_key, instance, timeout)
return instance
except model_class.DoesNotExist:
# 数据不存在,也缓存None值,避免频繁查询
cache.set(cache_key, None, 60) # 缓存1分钟
return None
@staticmethod
def update_data_with_cache(model_class, pk, **updates):
"""使用Cache-Aside模式更新数据"""
cache_key = f"model:{model_class._meta.label}:{pk}"
# 1. 更新数据库
updated_count = model_class.objects.filter(pk=pk).update(**updates)
# 2. 删除缓存(让下次查询重新加载)
cache.delete(cache_key)
return updated_count
@staticmethod
def delete_data_with_cache(model_class, pk):
"""使用Cache-Aside模式删除数据"""
cache_key = f"model:{model_class._meta.label}:{pk}"
# 1. 删除数据库记录
deleted_count = model_class.objects.filter(pk=pk).delete()[0]
# 2. 删除缓存
cache.delete(cache_key)
return deleted_count
# 使用示例
def user_operations_with_cache(user_id):
"""使用Cache-Aside模式的用户操作"""
# 获取用户
user = CacheAsidePattern.get_data_with_cache(
'auth.User', user_id, timeout=1800 # 30分钟
)
if user:
# 更新用户
CacheAsidePattern.update_data_with_cache(
'auth.User', user_id, last_login=timezone.now()
)
return user
# 高级Cache-Aside实现
class AdvancedCacheAside:
"""高级Cache-Aside实现"""
def __init__(self, cache_backend='default', default_timeout=300):
from django.core.cache import caches
self.cache = caches[cache_backend]
self.default_timeout = default_timeout
def get_or_fetch(self, cache_key: str, fetch_func, timeout: int = None,
cache_empty: bool = True):
"""获取或获取并缓存"""
# 从缓存获取
result = self.cache.get(cache_key)
if result is not None:
if result == "__empty__":
return None if cache_empty else result
return result
# 执行获取函数
result = fetch_func()
# 缓存结果
if result is None and not cache_empty:
# 不缓存空值
pass
else:
cache_value = result if result is not None else "__empty__"
self.cache.set(
cache_key,
cache_value,
timeout or self.default_timeout
)
return result
def invalidate_pattern(self, pattern: str):
"""根据模式删除缓存(需要Redis支持)"""
try:
import redis
# 获取Redis客户端
r = redis.Redis(host='localhost', port=6379, db=0)
keys = r.keys(pattern)
if keys:
r.delete(*keys)
except ImportError:
# 如果没有Redis,记录警告
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Cannot invalidate cache pattern {pattern}, Redis not available")
# 使用示例
def advanced_cache_usage():
"""高级缓存使用示例"""
cache_handler = AdvancedCacheAside(default_timeout=600)
def fetch_expensive_data():
"""获取昂贵数据的函数"""
import time
time.sleep(1) # 模拟耗时操作
return {"data": "expensive result", "timestamp": time.time()}
# 使用高级缓存
result = cache_handler.get_or_fetch(
"expensive_calculation:v1",
fetch_expensive_data,
timeout=900 # 15分钟
)
return result#Read-Through模式
# Read-Through模式实现
from django.core.cache import cache
from django.db import models
class ReadThroughCache:
"""Read-Through缓存实现"""
def __init__(self, model_class, cache_timeout=300):
self.model_class = model_class
self.cache_timeout = cache_timeout
self.cache_prefix = f"read_through:{model_class._meta.label}"
def get(self, key):
"""获取数据,自动从底层加载"""
cache_key = f"{self.cache_prefix}:{key}"
# 尝试从缓存获取
result = cache.get(cache_key)
if result is None:
# 缓存未命中,从数据库加载
result = self._load_from_db(key)
if result is not None:
# 存入缓存
cache.set(cache_key, result, self.cache_timeout)
return result
def _load_from_db(self, key):
"""从数据库加载数据"""
try:
# 假设key是主键
return self.model_class.objects.get(pk=key)
except self.model_class.DoesNotExist:
return None
def get_many(self, keys):
"""批量获取"""
cache_keys = [f"{self.cache_prefix}:{key}" for key in keys]
# 批量从缓存获取
cached_results = cache.get_many(cache_keys)
# 找出未命中的键
missing_keys = []
for i, key in enumerate(keys):
cache_key = cache_keys[i]
if cache_key not in cached_results:
missing_keys.append(key)
# 从数据库加载未命中的数据
if missing_keys:
db_results = self._load_many_from_db(missing_keys)
# 存入缓存
cache_to_set = {}
for key, value in zip(missing_keys, db_results):
cache_key = f"{self.cache_prefix}:{key}"
cache_to_set[cache_key] = value
if cache_to_set:
cache.set_many(cache_to_set, self.cache_timeout)
# 合并结果
cached_results.update({
f"{self.cache_prefix}:{key}": value
for key, value in zip(missing_keys, db_results)
})
# 返回按原始顺序排列的结果
results = []
for key in keys:
cache_key = f"{self.cache_prefix}:{key}"
results.append(cached_results.get(cache_key))
return results
def _load_many_from_db(self, keys):
"""批量从数据库加载"""
return list(self.model_class.objects.filter(pk__in=keys))
# 使用示例
def read_through_usage():
"""Read-Through使用示例"""
from django.contrib.auth.models import User
user_cache = ReadThroughCache(User, cache_timeout=1800) # 30分钟
# 单个获取
user = user_cache.get(1)
# 批量获取
users = user_cache.get_many([1, 2, 3, 4, 5])
return users
# 带验证的Read-Through
class ValidatedReadThroughCache(ReadThroughCache):
"""带验证的Read-Through缓存"""
def __init__(self, model_class, cache_timeout=300, validator=None):
super().__init__(model_class, cache_timeout)
self.validator = validator or self._default_validator
def _default_validator(self, obj):
"""默认验证器"""
return obj is not None and hasattr(obj, 'pk')
def get(self, key):
"""获取数据并验证"""
result = super().get(key)
if not self.validator(result):
# 验证失败,删除缓存并重新加载
cache_key = f"{self.cache_prefix}:{key}"
cache.delete(cache_key)
result = self._load_from_db(key)
if self.validator(result):
cache.set(cache_key, result, self.cache_timeout)
return result
# 使用示例
def validated_read_through_usage():
"""带验证的Read-Through使用示例"""
from django.contrib.auth.models import User
def user_validator(user):
"""用户验证器"""
return user is not None and user.is_active
user_cache = ValidatedReadThroughCache(
User,
cache_timeout=900, # 15分钟
validator=user_validator
)
user = user_cache.get(1)
return user#Write-Through模式
# Write-Through模式实现
class WriteThroughCache:
"""Write-Through缓存实现"""
def __init__(self, model_class, cache_timeout=300):
self.model_class = model_class
self.cache_timeout = cache_timeout
self.cache_prefix = f"write_through:{model_class._meta.label}"
def create(self, **data):
"""创建并写入缓存"""
# 1. 创建数据库记录
instance = self.model_class.objects.create(**data)
# 2. 同时写入缓存
cache_key = f"{self.cache_prefix}:{instance.pk}"
cache.set(cache_key, instance, self.cache_timeout)
return instance
def update(self, key, **data):
"""更新并同步缓存"""
# 1. 更新数据库
updated_count = self.model_class.objects.filter(pk=key).update(**data)
if updated_count > 0:
# 2. 同步更新缓存
cache_key = f"{self.cache_prefix}:{key}"
# 从数据库获取最新数据并更新缓存
try:
updated_instance = self.model_class.objects.get(pk=key)
cache.set(cache_key, updated_instance, self.cache_timeout)
except self.model_class.DoesNotExist:
# 如果数据库中不存在,删除缓存
cache.delete(cache_key)
return updated_count
def bulk_update(self, updates_list):
"""批量更新"""
updated_count = 0
cache_updates = {}
for update_data in updates_list:
pk = update_data.pop('pk')
count = self.model_class.objects.filter(pk=pk).update(**update_data)
updated_count += count
if count > 0:
# 准备缓存更新
try:
updated_instance = self.model_class.objects.get(pk=pk)
cache_key = f"{self.cache_prefix}:{pk}"
cache_updates[cache_key] = updated_instance
except self.model_class.DoesNotExist:
cache.delete(f"{self.cache_prefix}:{pk}")
# 批量更新缓存
if cache_updates:
cache.set_many(cache_updates, self.cache_timeout)
return updated_count
def delete(self, key):
"""删除并同步缓存"""
# 1. 删除数据库记录
deleted_count = self.model_class.objects.filter(pk=key).delete()[0]
# 2. 同步删除缓存
if deleted_count > 0:
cache_key = f"{self.cache_prefix}:{key}"
cache.delete(cache_key)
return deleted_count
def get(self, key):
"""获取(只读操作)"""
cache_key = f"{self.cache_prefix}:{key}"
result = cache.get(cache_key)
if result is None:
try:
result = self.model_class.objects.get(pk=key)
cache.set(cache_key, result, self.cache_timeout)
except self.model_class.DoesNotExist:
result = None
return result
# 使用示例
def write_through_usage():
"""Write-Through使用示例"""
from django.contrib.auth.models import User
user_cache = WriteThroughCache(User, cache_timeout=1800)
# 创建用户
user = user_cache.create(
username='newuser',
email='newuser@example.com',
first_name='New',
last_name='User'
)
# 更新用户
user_cache.update(user.pk, last_name='Updated')
# 获取用户
retrieved_user = user_cache.get(user.pk)
return retrieved_user
# 事务安全的Write-Through
from django.db import transaction
class TransactionSafeWriteThroughCache(WriteThroughCache):
"""事务安全的Write-Through缓存"""
def update(self, key, **data):
"""事务安全的更新"""
with transaction.atomic():
# 1. 更新数据库
updated_count = self.model_class.objects.filter(pk=key).update(**data)
if updated_count > 0:
# 2. 同步更新缓存
try:
updated_instance = self.model_class.objects.select_for_update().get(pk=key)
cache_key = f"{self.cache_prefix}:{key}"
cache.set(cache_key, updated_instance, self.cache_timeout)
except self.model_class.DoesNotExist:
cache.delete(f"{self.cache_prefix}:{key}")
return updated_count
def bulk_update(self, updates_list):
"""事务安全的批量更新"""
with transaction.atomic():
updated_count = 0
cache_updates = {}
for update_data in updates_list:
pk = update_data.pop('pk')
# 使用select_for_update锁定记录
try:
instance = self.model_class.objects.select_for_update().get(pk=pk)
for field, value in update_data.items():
setattr(instance, field, value)
instance.save()
updated_count += 1
# 准备缓存更新
cache_key = f"{self.cache_prefix}:{pk}"
cache_updates[cache_key] = instance
except self.model_class.DoesNotExist:
continue
# 批量更新缓存
if cache_updates:
cache.set_many(cache_updates, self.cache_timeout)
return updated_count#缓存失效策略
# 缓存失效策略
import time
from typing import List, Callable
class CacheInvalidationStrategy:
"""缓存失效策略"""
@staticmethod
def time_based_invalidaton(timeout: int):
"""基于时间的失效策略"""
def strategy(func):
def wrapper(*args, **kwargs):
cache_key = f"{func.__module__}:{func.__name__}:{hash(str(args) + str(kwargs))}"
result = cache.get(cache_key)
if result is None:
result = func(*args, **kwargs)
cache.set(cache_key, result, timeout)
return result
return wrapper
return strategy
@staticmethod
def event_based_invalidaton(invalidate_on: List[str]):
"""基于事件的失效策略"""
def decorator(func):
def wrapper(*args, **kwargs):
cache_key = f"event_cache:{func.__name__}:{hash(str(args) + str(kwargs))}"
result = cache.get(cache_key)
if result is None:
result = func(*args, **kwargs)
# 设置缓存并关联失效事件
cache.set(cache_key, result, timeout=3600)
# 注册失效监听器
for event in invalidate_on:
register_invalidation_listener(event, cache_key)
return result
return wrapper
return decorator
# 缓存预热策略
class CacheWarmingStrategy:
"""缓存预热策略"""
@staticmethod
def warm_common_queries():
"""预热常见查询"""
from django.contrib.auth.models import User
from .models import Product, Category
# 预热热门商品
popular_products = Product.objects.filter(
is_active=True
).order_by('-view_count')[:50]
for product in popular_products:
# 预热单个商品缓存
cache_key = f"product:{product.pk}"
cache.set(cache_key, product, 3600)
# 预热分类统计
categories = Category.objects.all()
for category in categories:
count = category.product_set.filter(is_active=True).count()
cache_key = f"category:{category.pk}:product_count"
cache.set(cache_key, count, 7200) # 2小时
@staticmethod
def scheduled_warming():
"""定时预热"""
import threading
def warm_cache():
while True:
try:
CacheWarmingStrategy.warm_common_queries()
time.sleep(3600) # 每小时预热一次
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"Cache warming failed: {e}")
time.sleep(300) # 失败后5分钟重试
# 在后台线程中运行
thread = threading.Thread(target=warm_cache, daemon=True)
thread.start()
# 缓存穿透防护
class CachePenetrationProtection:
"""缓存穿透防护"""
@staticmethod
def protect_null_values(func):
"""保护空值防止缓存穿透"""
def wrapper(*args, **kwargs):
cache_key = f"protected:{func.__name__}:{hash(str(args) + str(kwargs))}"
# 检查是否有保护标记
protection_key = f"protection:{cache_key}"
if cache.get(protection_key):
# 正在查询中,返回默认值
return func.__defaults__[0] if func.__defaults__ else None
result = cache.get(cache_key)
if result is not None:
if result == "__null__":
return None
return result
# 设置保护标记,防止并发查询
cache.set(protection_key, 1, 30) # 保护30秒
try:
result = func(*args, **kwargs)
# 缓存结果(包括空值)
cache_result = result if result is not None else "__null__"
cache.set(cache_key, cache_result, 300) # 5分钟
# 移除保护标记
cache.delete(protection_key)
return result
except Exception:
# 查询失败,移除保护标记
cache.delete(protection_key)
raise
return wrapper
# 使用示例
@CachePenetrationProtection.protect_null_values
def get_user_by_username(username):
"""获取用户(防穿透)"""
from django.contrib.auth.models import User
try:
return User.objects.get(username=username)
except User.DoesNotExist:
return None
# 缓存雪崩防护
class CacheAvalancheProtection:
"""缓存雪崩防护"""
@staticmethod
def add_random_timeout(base_timeout: int, jitter_range: int = 60):
"""添加随机超时防止雪崩"""
import random
def decorator(func):
def wrapper(*args, **kwargs):
cache_key = f"avalanche_protected:{func.__name__}:{hash(str(args) + str(kwargs))}"
result = cache.get(cache_key)
if result is None:
result = func(*args, **kwargs)
# 添加随机抖动
actual_timeout = base_timeout + random.randint(-jitter_range, jitter_range)
cache.set(cache_key, result, actual_timeout)
return result
return wrapper
return decorator
# 使用示例
@CacheAvalancheProtection.add_random_timeout(3600, 300) # 1小时±5分钟
def get_popular_articles():
"""获取热门文章(防雪崩)"""
from .models import Article
return list(Article.objects.filter(
is_published=True
).order_by('-view_count')[:10])#缓存性能优化
#缓存压缩
# 缓存压缩优化
import pickle
import zlib
import gzip
import json
from django.core.cache import cache
class CompressedCache:
"""压缩缓存类"""
@staticmethod
def set_compressed(key: str, value, timeout: int = 300, compression_level: int = 6):
"""设置压缩缓存"""
# 序列化数据
serialized_data = pickle.dumps(value)
# 压缩数据
compressed_data = zlib.compress(serialized_data, compression_level)
# 存储压缩后的数据
cache.set(key, compressed_data, timeout)
@staticmethod
def get_compressed(key: str, default=None):
"""获取压缩缓存"""
compressed_data = cache.get(key, default)
if compressed_data is default:
return default
if compressed_data is None:
return None
# 解压缩数据
serialized_data = zlib.decompress(compressed_data)
# 反序列化
return pickle.loads(serialized_data)
@staticmethod
def set_json_compressed(key: str, value, timeout: int = 300):
"""使用JSON压缩设置缓存"""
# 转换为JSON并编码
json_data = json.dumps(value, default=str).encode('utf-8')
# 压缩
compressed_data = gzip.compress(json_data)
# 存储
cache.set(key, compressed_data, timeout)
@staticmethod
def get_json_compressed(key: str, default=None):
"""使用JSON压缩获取缓存"""
compressed_data = cache.get(key, default)
if compressed_data is default:
return default
if compressed_data is None:
return None
# 解压缩
json_data = gzip.decompress(compressed_data)
# 解析JSON
return json.loads(json_data.decode('utf-8'))
# 压缩装饰器
def compressed_cache(timeout=300, compression_type='zlib'):
"""压缩缓存装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"compressed:{func.__name__}:{hash(str(args) + str(kwargs))}"
# 尝试获取压缩缓存
if compression_type == 'zlib':
result = CompressedCache.get_compressed(cache_key)
elif compression_type == 'json':
result = CompressedCache.get_json_compressed(cache_key)
else:
result = cache.get(cache_key)
if result is None:
# 执行函数
result = func(*args, **kwargs)
# 存储压缩缓存
if compression_type == 'zlib':
CompressedCache.set_compressed(cache_key, result, timeout)
elif compression_type == 'json':
CompressedCache.set_json_compressed(cache_key, result, timeout)
else:
cache.set(cache_key, result, timeout)
return result
return wrapper
return decorator
# 使用示例
@compressed_cache(timeout=1800, compression_type='zlib')
def get_large_dataset():
"""获取大数据集"""
# 模拟大数据集
large_data = [{'id': i, 'data': f'data_{i}'} for i in range(10000)]
return large_data
# 批量压缩缓存
class BatchCompressedCache:
"""批量压缩缓存"""
@staticmethod
def set_many_compressed(data_dict: dict, timeout: int = 300):
"""批量设置压缩缓存"""
compressed_dict = {}
for key, value in data_dict.items():
serialized_data = pickle.dumps(value)
compressed_data = zlib.compress(serialized_data)
compressed_dict[key] = compressed_data
cache.set_many(compressed_dict, timeout)
@staticmethod
def get_many_compressed(keys: list):
"""批量获取压缩缓存"""
compressed_values = cache.get_many(keys)
decompressed_values = {}
for key, compressed_value in compressed_values.items():
if compressed_value:
serialized_data = zlib.decompress(compressed_value)
decompressed_values[key] = pickle.loads(serialized_data)
else:
decompressed_values[key] = compressed_value
return decompressed_values#缓存分片
# 缓存分片策略
import hashlib
from typing import Any, List, Dict
class ShardedCache:
"""分片缓存实现"""
def __init__(self, num_shards: int = 4):
self.num_shards = num_shards
self.shard_keys = [f"shard_{i}" for i in range(num_shards)]
def _get_shard_index(self, key: str) -> int:
"""获取键所属的分片索引"""
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
return hash_value % self.num_shards
def _get_shard_key(self, key: str) -> str:
"""获取分片键"""
shard_index = self._get_shard_index(key)
return f"{self.shard_keys[shard_index]}:{key}"
def set(self, key: str, value, timeout: int = 300):
"""设置分片缓存"""
shard_key = self._get_shard_key(key)
cache.set(shard_key, value, timeout)
def get(self, key: str, default=None):
"""获取分片缓存"""
shard_key = self._get_shard_key(key)
return cache.get(shard_key, default)
def delete(self, key: str):
"""删除分片缓存"""
shard_key = self._get_shard_key(key)
return cache.delete(shard_key)
def set_many(self, data_dict: Dict[str, Any], timeout: int = 300):
"""批量设置分片缓存"""
shard_dicts = {i: {} for i in range(self.num_shards)}
for key, value in data_dict.items():
shard_index = self._get_shard_index(key)
shard_key = self._get_shard_key(key)
shard_dicts[shard_index][shard_key] = value
# 并行设置各分片
for shard_index in range(self.num_shards):
if shard_dicts[shard_index]:
cache.set_many(shard_dicts[shard_index], timeout)
# 一致性哈希分片
class ConsistentHashShardCache:
"""一致性哈希分片缓存"""
def __init__(self, nodes: List[str], replicas: int = 150):
self.nodes = nodes
self.replicas = replicas
self.ring = {}
self.sorted_keys = []
for node in self.nodes:
for i in range(self.replicas):
key = self._hash(f"{node}:{i}")
self.ring[key] = node
self.sorted_keys.append(key)
self.sorted_keys.sort()
def _hash(self, key: str) -> int:
"""计算哈希值"""
return int(hashlib.sha256(key.encode()).hexdigest(), 16)
def _get_node(self, key: str) -> str:
"""获取键对应的节点"""
if not self.ring:
return None
hash_key = self._hash(key)
# 二分查找
import bisect
idx = bisect.bisect(self.sorted_keys, hash_key)
if idx == len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]
def get_cache_backend(self, key: str):
"""获取对应的缓存后端"""
from django.core.cache import caches
node = self._get_node(key)
return caches[node] if node else cache
def set(self, key: str, value, timeout: int = 300):
"""设置缓存"""
backend = self.get_cache_backend(key)
backend.set(key, value, timeout)
def get(self, key: str, default=None):
"""获取缓存"""
backend = self.get_cache_backend(key)
return backend.get(key, default)
# 使用示例
def sharded_cache_example():
"""分片缓存示例"""
# 创建4个分片的缓存
sharded_cache = ShardedCache(num_shards=4)
# 设置数据
for i in range(100):
sharded_cache.set(f"user:{i}", {"id": i, "name": f"user_{i}"})
# 获取数据
user_data = sharded_cache.get("user:50")
return user_data#缓存预取和懒加载
# 缓存预取和懒加载
import asyncio
import concurrent.futures
from typing import List, Dict, Any
class LazyLoadCache:
"""懒加载缓存"""
def __init__(self):
self._cache = {}
self._loading = set()
def get_lazy(self, key: str, loader_func, timeout: int = 300):
"""懒加载获取"""
if key in self._cache:
return self._cache[key]
if key in self._loading:
# 正在加载,返回占位符或等待
import time
start_time = time.time()
while key in self._loading and time.time() - start_time < 5: # 5秒超时
time.sleep(0.1)
return self._cache.get(key) if key in self._cache else None
# 开始加载
self._loading.add(key)
try:
value = loader_func()
self._cache[key] = value
# 同时设置到主缓存
cache.set(key, value, timeout)
return value
finally:
self._loading.discard(key)
def prefetch(self, keys: List[str], loader_func, timeout: int = 300):
"""预取多个键"""
def load_single(key):
if key not in self._cache and key not in self._loading:
self._loading.add(key)
try:
value = loader_func(key)
self._cache[key] = value
cache.set(key, value, timeout)
return value
finally:
self._loading.discard(key)
# 使用线程池并行加载
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(load_single, key) for key in keys]
results = [future.result() for future in futures]
return results
# 智能预取策略
class IntelligentPrefetch:
"""智能预取策略"""
def __init__(self):
self.access_patterns = {}
self.prefetch_queue = []
def record_access(self, key: str):
"""记录访问模式"""
if key not in self.access_patterns:
self.access_patterns[key] = {'count': 0, 'last_access': None}
self.access_patterns[key]['count'] += 1
self.access_patterns[key]['last_access'] = time.time()
def predict_next_accesses(self, current_key: str, n: int = 5) -> List[str]:
"""预测下一个访问的键"""
# 简单的最近最少使用预测
recent_keys = [
k for k, v in sorted(
self.access_patterns.items(),
key=lambda x: x[1]['last_access'] or 0,
reverse=True
)[:n]
return recent_keys
# 预取装饰器
def prefetch_on_access(predict_func=None, prefetch_count=3):
"""访问时预取装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
# 如果提供了预测函数,执行预取
if predict_func:
predicted_keys = predict_func(args[0] if args else '', prefetch_count)
# 在后台线程中预取
import threading
thread = threading.Thread(
target=lambda: prefetch_data(predicted_keys)
)
thread.daemon = True
thread.start()
return result
return wrapper
return decorator
def prefetch_data(keys):
"""预取数据"""
for key in keys:
# 尝试从缓存获取,如果不存在则加载
if cache.get(key) is None:
# 这里可以实现具体的预取逻辑
pass
# 使用示例
@prefetch_on_access(prefetch_count=5)
def get_user_data(user_id):
"""获取用户数据(带预取)"""
from django.contrib.auth.models import User
return User.objects.get(id=user_id)#缓存监控和诊断
# 缓存监控和诊断工具
import time
import psutil
import redis
from django.core.cache import cache
from django.conf import settings
from collections import defaultdict, deque
import threading
from datetime import datetime, timedelta
class CacheMonitor:
"""缓存监控器"""
def __init__(self):
self.hit_count = 0
self.miss_count = 0
self.access_times = deque(maxlen=1000) # 最近1000次访问时间
self.lock = threading.Lock()
self.stats_history = deque(maxlen=100) # 保留最近100个统计点
def record_hit(self, duration):
"""记录缓存命中"""
with self.lock:
self.hit_count += 1
self.access_times.append(('hit', time.time(), duration))
def record_miss(self, duration):
"""记录缓存未命中"""
with self.lock:
self.miss_count += 1
self.access_times.append(('miss', time.time(), duration))
def get_stats(self):
"""获取统计信息"""
with self.lock:
total_requests = self.hit_count + self.miss_count
hit_rate = self.hit_count / total_requests if total_requests > 0 else 0
# 计算最近的统计
recent_hits = sum(1 for t in self.access_times if t[0] == 'hit')
recent_misses = len(self.access_times) - recent_hits
recent_total = len(self.access_times)
recent_hit_rate = recent_hits / recent_total if recent_total > 0 else 0
# 计算平均响应时间
if self.access_times:
avg_duration = sum(t[2] for t in self.access_times) / len(self.access_times)
else:
avg_duration = 0
stats = {
'total_requests': total_requests,
'hit_count': self.hit_count,
'miss_count': self.miss_count,
'hit_rate': hit_rate,
'recent_hit_rate': recent_hit_rate,
'avg_response_time': avg_duration,
'cache_size': self.get_cache_size(),
'memory_usage': self.get_memory_usage(),
}
# 添加到历史记录
self.stats_history.append({
'timestamp': datetime.now(),
'stats': stats.copy()
})
return stats
def get_cache_size(self):
"""获取缓存大小"""
try:
# 如果使用Redis
if hasattr(settings, 'CACHES') and 'default' in settings.CACHES:
cache_config = settings.CACHES['default']
if cache_config['BACKEND'] == 'django.core.cache.backends.redis.RedisCache':
redis_client = redis.Redis.from_url(cache_config['LOCATION'])
return redis_client.dbsize()
except:
pass
return 0
def get_memory_usage(self):
"""获取内存使用情况"""
try:
process = psutil.Process()
return process.memory_info().rss
except:
return 0
def get_trend_analysis(self, minutes=10):
"""获取趋势分析"""
with self.lock:
cutoff_time = time.time() - (minutes * 60)
recent_accesses = [
t for t in self.access_times
if t[1] > cutoff_time
]
if not recent_accesses:
return {}
hits = sum(1 for t in recent_accesses if t[0] == 'hit')
misses = len(recent_accesses) - hits
return {
'period': f'last_{minutes}_minutes',
'requests': len(recent_accesses),
'hit_rate': hits / len(recent_accesses) if recent_accesses else 0,
'requests_per_minute': len(recent_accesses) / minutes
}
# 全局监控实例
cache_monitor = CacheMonitor()
# 监控装饰器
def monitored_cache(timeout=300):
"""监控缓存装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
cache_key = f"monitored:{func.__name__}:{hash(str(args) + str(kwargs))}"
result = cache.get(cache_key)
end_time = time.time()
duration = end_time - start_time
if result is None:
# 缓存未命中
result = func(*args, **kwargs)
cache.set(cache_key, result, timeout)
cache_monitor.record_miss(duration)
else:
# 缓存命中
cache_monitor.record_hit(duration)
return result
return wrapper
return decorator
# 缓存诊断工具
class CacheDiagnostic:
"""缓存诊断工具"""
@staticmethod
def diagnose_performance():
"""诊断缓存性能"""
stats = cache_monitor.get_stats()
diagnosis = {
'overall_health': 'good',
'recommendations': [],
'warnings': []
}
# 检查命中率
if stats['hit_rate'] < 0.7:
diagnosis['overall_health'] = 'poor'
diagnosis['warnings'].append(f"缓存命中率过低: {stats['hit_rate']:.2%}")
diagnosis['recommendations'].append("考虑增加缓存时间或调整缓存策略")
elif stats['hit_rate'] < 0.85:
diagnosis['overall_health'] = 'fair'
diagnosis['recommendations'].append("缓存命中率可以进一步优化")
# 检查响应时间
if stats['avg_response_time'] > 0.1: # 100ms
diagnosis['warnings'].append(f"平均响应时间过长: {stats['avg_response_time']:.3f}s")
diagnosis['recommendations'].append("检查缓存后端性能")
# 检查缓存大小
if stats['cache_size'] == 0:
diagnosis['warnings'].append("无法获取缓存大小信息")
return diagnosis
@staticmethod
def analyze_hot_keys(sample_size=1000):
"""分析热点键"""
try:
if hasattr(settings, 'CACHES') and 'default' in settings.CACHES:
cache_config = settings.CACHES['default']
if cache_config['BACKEND'] == 'django.core.cache.backends.redis.RedisCache':
redis_client = redis.Redis.from_url(cache_config['LOCATION'])
# 获取所有键(生产环境需谨慎使用)
all_keys = redis_client.keys('*')
# 分析键的使用频率(需要Redis配置相关查询)
hot_keys = []
for key in all_keys[:sample_size]: # 只分析样本
try:
size = redis_client.memory_usage(key)
ttl = redis_client.ttl(key)
hot_keys.append({
'key': key.decode() if isinstance(key, bytes) else key,
'size': size,
'ttl': ttl
})
except redis.ResponseError:
# 某些Redis版本可能不支持memory_usage
hot_keys.append({
'key': key.decode() if isinstance(key, bytes) else key,
'size': 'unknown',
'ttl': redis_client.ttl(key)
})
# 按大小排序
hot_keys.sort(key=lambda x: x['size'] if isinstance(x['size'], int) else 0, reverse=True)
return hot_keys[:50] # 返回最大的50个键
except Exception as e:
print(f"分析热点键失败: {e}")
return []
@staticmethod
def get_cache_efficiency_report():
"""获取缓存效率报告"""
stats = cache_monitor.get_stats()
trend = cache_monitor.get_trend_analysis(minutes=5)
report = {
'timestamp': datetime.now().isoformat(),
'current_stats': stats,
'trend_analysis': trend,
'diagnosis': CacheDiagnostic.diagnose_performance(),
'suggestions': CacheDiagnostic.get_improvement_suggestions(stats)
}
return report
@staticmethod
def get_improvement_suggestions(stats):
"""获取改进建议"""
suggestions = []
if stats['hit_rate'] < 0.8:
suggestions.append("考虑增加常用数据的缓存时间")
suggestions.append("分析缓存未命中的原因,优化缓存键策略")
if stats['avg_response_time'] > 0.05: # 50ms
suggestions.append("考虑使用更快的缓存后端(如内存缓存)")
suggestions.append("检查网络延迟,考虑使用本地缓存")
if stats['total_requests'] > 10000 and stats['hit_rate'] < 0.9:
suggestions.append("对于高频访问的数据,考虑使用多级缓存策略")
return suggestions
# 缓存性能测试
class CachePerformanceTester:
"""缓存性能测试器"""
@staticmethod
def test_cache_speed(iterations=1000):
"""测试缓存速度"""
import random
import string
def random_string(length=10):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
results = {
'set_times': [],
'get_times': [],
'hit_rates': []
}
for i in range(iterations):
key = f"test_key_{random_string()}"
value = random_string(100)
# 测试设置时间
start = time.time()
cache.set(key, value, 300)
set_time = time.time() - start
results['set_times'].append(set_time)
# 测试获取时间
start = time.time()
retrieved = cache.get(key)
get_time = time.time() - start
results['get_times'].append(get_time)
# 计算统计信息
avg_set_time = sum(results['set_times']) / len(results['set_times'])
avg_get_time = sum(results['get_times']) / len(results['get_times'])
performance_stats = {
'iterations': iterations,
'avg_set_time': avg_set_time,
'avg_get_time': avg_get_time,
'total_test_time': sum(results['set_times']) + sum(results['get_times']),
'sets_per_second': iterations / sum(results['set_times']) if sum(results['set_times']) > 0 else float('inf'),
'gets_per_second': iterations / sum(results['get_times']) if sum(results['get_times']) > 0 else float('inf'),
}
return performance_stats
# 使用示例
def cache_monitoring_example():
"""缓存监控示例"""
@monitored_cache(timeout=600)
def expensive_function(x, y):
"""昂贵的函数(带监控)"""
time.sleep(0.1) # 模拟耗时操作
return x * y
# 执行一些操作来产生监控数据
for i in range(100):
expensive_function(i, i + 1)
# 获取统计信息
stats = cache_monitor.get_stats()
print(f"命中率: {stats['hit_rate']:.2%}")
print(f"平均响应时间: {stats['avg_response_time']:.3f}s")
# 获取诊断报告
diagnostic = CacheDiagnostic.get_cache_efficiency_report()
print(f"健康状况: {diagnostic['diagnosis']['overall_health']}")
return stats#常见问题与解决方案
#问题1:缓存雪崩
症状:大量缓存同时过期,导致数据库压力骤增
解决方案:
# 1. 使用随机过期时间
def set_with_jitter(key, value, base_timeout=300, jitter_range=60):
"""设置带抖动的缓存"""
import random
actual_timeout = base_timeout + random.randint(-jitter_range, jitter_range)
cache.set(key, value, actual_timeout)
# 2. 使用永不过期的缓存 + 后台更新
def set_persistent_cache(key, value, refresh_func, interval=300):
"""设置持久缓存"""
cache.set(key, {
'value': value,
'refresh_time': time.time() + interval
}, timeout=None) # 永不过期
def get_with_background_refresh(key, refresh_func, interval=300):
"""获取带后台刷新的缓存"""
data = cache.get(key)
if data is None:
# 缓存不存在,首次加载
value = refresh_func()
set_persistent_cache(key, value, refresh_func, interval)
return value
# 检查是否需要刷新
if time.time() > data['refresh_time']:
# 在后台线程中刷新
import threading
thread = threading.Thread(
target=lambda: _refresh_cache_background(key, refresh_func, interval)
)
thread.daemon = True
thread.start()
return data['value']
def _refresh_cache_background(key, refresh_func, interval):
"""后台刷新缓存"""
try:
new_value = refresh_func()
cache.set(key, {
'value': new_value,
'refresh_time': time.time() + interval
}, timeout=None)
except Exception as e:
import logging
logging.error(f"Background cache refresh failed: {e}")
# 3. 分级缓存策略
class TieredCache:
"""分级缓存"""
def __init__(self):
from django.core.cache import caches
self.short_term = caches['default'] # 短期缓存
self.long_term = caches['temporary'] # 长期缓存
def get(self, key):
"""获取数据(多级缓存)"""
# 先查短期缓存
value = self.short_term.get(key)
if value is not None:
return value
# 再查长期缓存
value = self.long_term.get(key)
if value is not None:
# 同步到短期缓存
self.short_term.set(key, value, timeout=300)
return value
return None
def set(self, key, value, short_timeout=300, long_timeout=3600):
"""设置数据(多级缓存)"""
self.short_term.set(key, value, timeout=short_timeout)
self.long_term.set(key, value, timeout=long_timeout)#问题2:缓存穿透
症状:查询不存在的数据,缓存中也没有,导致每次查询都打到数据库
解决方案:
# 1. 缓存空值
def get_with_null_cache(model_class, pk, timeout=300):
"""带空值缓存的获取"""
cache_key = f"model:{model_class._meta.label}:{pk}"
result = cache.get(cache_key)
if result is not None:
if result == "__null__":
return None
return result
try:
instance = model_class.objects.get(pk=pk)
cache.set(cache_key, instance, timeout)
return instance
except model_class.DoesNotExist:
# 缓存空值,避免频繁查询数据库
cache.set(cache_key, "__null__", timeout=60) # 空值只缓存1分钟
return None
# 2. 布隆过滤器(简化版)
class BloomFilterCache:
"""布隆过滤器缓存(简化版)"""
def __init__(self, capacity=10000, error_rate=0.1):
import math
self.capacity = capacity
self.error_rate = error_rate
# 计算位数组大小和哈希函数数量
self.bit_array_size = self._get_size(capacity, error_rate)
self.hash_count = self._get_hash_count(self.bit_array_size, capacity)
# 初始化位数组
self.bit_array = [0] * self.bit_array_size
def _get_size(self, n, p):
"""计算位数组大小"""
import math
m = -(n * math.log(p)) / (math.log(2) ** 2)
return int(m)
def _get_hash_count(self, m, n):
"""计算哈希函数数量"""
import math
k = (m / n) * math.log(2)
return int(k)
def _hash(self, item, seed):
"""哈希函数"""
import hashlib
hash_obj = hashlib.md5((str(item) + str(seed)).encode())
return int(hash_obj.hexdigest(), 16) % self.bit_array_size
def add(self, item):
"""添加元素"""
for i in range(self.hash_count):
index = self._hash(item, i)
self.bit_array[index] = 1
def contains(self, item):
"""检查元素是否存在"""
for i in range(self.hash_count):
index = self._hash(item, i)
if self.bit_array[index] == 0:
return False
return True
# 全局布隆过滤器实例
bloom_filter = BloomFilterCache()
def safe_get_with_bloom_filter(model_class, pk):
"""使用布隆过滤器的安全获取"""
# 先检查布隆过滤器
bloom_key = f"{model_class._meta.label}:{pk}"
if not bloom_filter.contains(bloom_key):
# 布隆过滤器说不存在,直接返回None
return None
# 布隆过滤器说可能存在,再查缓存
cache_key = f"bloom:{bloom_key}"
result = cache.get(cache_key)
if result is not None:
if result == "__null__":
return None
return result
# 查数据库
try:
instance = model_class.objects.get(pk=pk)
cache.set(cache_key, instance, 300)
bloom_filter.add(bloom_key) # 添加到布隆过滤器
return instance
except model_class.DoesNotExist:
cache.set(cache_key, "__null__", 60)
return None#问题3:缓存击穿
症状:热点数据过期时,大量并发请求同时查询数据库
解决方案:
import threading
class LockingCache:
"""带锁的缓存"""
def __init__(self):
self.locks = {}
self.lock = threading.Lock()
def get_with_mutex(self, key, fetch_func, timeout=300):
"""带互斥锁的获取"""
# 获取针对特定key的锁
with self.lock:
if key not in self.locks:
self.locks[key] = threading.Lock()
key_lock = self.locks[key]
with key_lock:
# 双重检查
result = cache.get(key)
if result is not None:
if result == "__null__":
return None
return result
# 执行获取函数
result = fetch_func()
# 设置缓存
cache_result = result if result is not None else "__null__"
cache.set(key, cache_result, timeout)
return result
# 全局锁缓存实例
locking_cache = LockingCache()
def get_with_mutex_protection(model_class, pk):
"""带锁保护的获取"""
def fetch_func():
try:
return model_class.objects.get(pk=pk)
except model_class.DoesNotExist:
return None
return locking_cache.get_with_mutex(
f"mutex:{model_class._meta.label}:{pk}",
fetch_func,
timeout=300
)
# 2. 读写分离缓存
class ReadWriteSeparatedCache:
"""读写分离缓存"""
def __init__(self):
from django.core.cache import caches
self.read_cache = caches['default'] # 读缓存
self.write_cache = caches['default'] # 写缓存(可以是不同的后端)
self.update_locks = {}
self.global_lock = threading.Lock()
def get(self, key):
"""读取操作"""
return self.read_cache.get(key)
def set(self, key, value, timeout=300):
"""写入操作"""
self.write_cache.set(key, value, timeout)
# 同时更新读缓存
self.read_cache.set(key, value, timeout)
def update_with_protection(self, key, update_func, timeout=300):
"""受保护的更新操作"""
with self.global_lock:
if key not in self.update_locks:
self.update_locks[key] = threading.Lock()
update_lock = self.update_locks[key]
with update_lock:
# 获取当前值
current_value = self.get(key)
# 执行更新函数
new_value = update_func(current_value)
# 设置新值
self.set(key, new_value, timeout)
return new_value
# 使用示例
rw_cache = ReadWriteSeparatedCache()
def increment_counter_safely(counter_key):
"""安全的计数器递增"""
def update_counter(current_value):
current_value = current_value or 0
return current_value + 1
return rw_cache.update_with_protection(counter_key, update_counter, 3600)#问题4:内存泄漏
症状:缓存占用内存持续增长,导致内存不足
解决方案:
# 1. 使用LRU缓存
from functools import lru_cache
@lru_cache(maxsize=1000) # 最多缓存1000个结果
def lru_cached_function(param):
"""LRU缓存函数"""
# 执行昂贵的操作
time.sleep(0.01)
return f"result for {param}"
# 2. 自定义容量限制缓存
class CapacityLimitedCache:
"""容量限制缓存"""
def __init__(self, max_size=1000):
self.max_size = max_size
self.cache = {}
self.access_order = [] # 记录访问顺序
self.lock = threading.RLock()
def get(self, key, default=None):
"""获取值"""
with self.lock:
if key in self.cache:
# 更新访问顺序
if key in self.access_order:
self.access_order.remove(key)
self.access_order.append(key)
return self.cache[key]
return default
def set(self, key, value, timeout=None):
"""设置值"""
with self.lock:
if key in self.cache:
# 更新现有键
self.cache[key] = value
if key in self.access_order:
self.access_order.remove(key)
self.access_order.append(key)
else:
# 添加新键
if len(self.cache) >= self.max_size:
# 移除最久未使用的键
oldest_key = self.access_order.pop(0)
del self.cache[oldest_key]
self.cache[key] = value
self.access_order.append(key)
def evict_least_recent(self):
"""手动驱逐最久未使用的项"""
with self.lock:
if self.access_order:
oldest_key = self.access_order.pop(0)
if oldest_key in self.cache:
del self.cache[oldest_key]
# 3. 定期清理过期缓存
class SelfCleaningCache:
"""自清理缓存"""
def __init__(self, cleanup_interval=3600): # 1小时清理一次
self.cleanup_interval = cleanup_interval
self.last_cleanup = time.time()
self.lock = threading.Lock()
def cleanup_expired(self):
"""清理过期缓存"""
with self.lock:
if time.time() - self.last_cleanup > self.cleanup_interval:
# 这里需要根据具体的缓存后端实现清理逻辑
# 对于Redis,可以使用TTL检查
try:
if hasattr(settings, 'CACHES') and 'default' in settings.CACHES:
cache_config = settings.CACHES['default']
if cache_config['BACKEND'] == 'django.core.cache.backends.redis.RedisCache':
import redis
redis_client = redis.Redis.from_url(cache_config['LOCATION'])
# 查找并清理过期键
all_keys = redis_client.keys('*')
for key in all_keys:
if redis_client.ttl(key) < 0: # 已过期
redis_client.delete(key)
except Exception as e:
import logging
logging.error(f"Cache cleanup failed: {e}")
self.last_cleanup = time.time()
# 全局自清理缓存实例
self_cleaning_cache = SelfCleaningCache()
# 在应用启动时启动清理线程
def start_cache_cleanup_thread():
"""启动缓存清理线程"""
import threading
def cleanup_worker():
while True:
try:
time.sleep(3600) # 每小时检查一次
self_cleaning_cache.cleanup_expired()
except Exception as e:
import logging
logging.error(f"Cache cleanup worker error: {e}")
thread = threading.Thread(target=cleanup_worker, daemon=True)
thread.start()
# 启动清理线程
start_cache_cleanup_thread()#本章小结
在本章中,我们深入学习了Django缓存策略:
- 缓存基础概念:理解了缓存的工作原理和重要性
- 缓存架构:掌握了Django缓存系统的整体架构
- 缓存后端配置:学会了Redis、Memcached等缓存后端的配置
- 缓存类型与使用:了解了不同层级的缓存使用方法
- 缓存策略模式:学习了Cache-Aside、Read-Through等模式
- 性能优化:掌握了压缩、分片、预取等优化技术
- 监控诊断:学会了缓存监控和性能分析方法
- 问题解决方案:了解了雪崩、穿透、击穿等问题的解决方法
#核心要点回顾
"""
本章核心要点:
1. 缓存是提升应用性能的关键技术
2. 选择合适的缓存后端和策略很重要
3. 需要注意缓存一致性问题
4. 监控缓存性能是必要的
5. 合理设置缓存过期时间
6. 防范缓存雪崩、穿透、击穿问题
7. 定期清理过期缓存避免内存泄漏
8. 根据业务场景选择合适的缓存策略
"""💡 核心要点:缓存策略需要根据具体的应用场景和性能需求来设计,合理的缓存策略能够显著提升应用性能,但也要注意避免缓存带来的复杂性和潜在问题。
#SEO优化策略
- 关键词布局: 在标题、内容中合理布局"Django缓存", "缓存策略", "性能优化", "Redis缓存", "缓存层级"等关键词
- 内容结构: 使用清晰的标题层级(H1-H3),便于搜索引擎理解内容结构
- 内部链接: 建立与其他相关教程的内部链接,提升页面权重
- 元数据优化: 在页面头部包含描述性的标题、描述和标签
🏷️ 标签云: Django缓存 缓存策略 性能优化 Redis缓存 缓存层级 缓存模式 缓存监控 缓存优化 缓存安全

