#Django性能优化 - 提升应用响应速度与并发能力
📂 所属阶段:第三部分 — 高级主题
🎯 难度等级:高级
⏰ 预计学习时间:6-8小时
🎒 前置知识:缓存策略, 数据库查询
#目录
#性能优化概述
性能优化是Web应用开发中至关重要的环节,直接影响用户体验和系统可扩展性。
#性能指标
"""
关键性能指标:
1. 响应时间 (Response Time)
- 首字节时间 (TTFB)
- 页面加载时间
- API响应时间
2. 吞吐量 (Throughput)
- 每秒请求数 (RPS/QPS)
- 并发用户数
- 数据处理量
3. 资源利用率
- CPU使用率
- 内存占用
- 数据库连接数
4. 可扩展性指标
- 水平扩展能力
- 垂直扩展能力
- 负载均衡效果
"""#Django性能瓶颈
"""
Django常见性能瓶颈:
1. 数据库查询
- N+1查询问题
- 未优化的JOIN操作
- 缺少索引的查询
- 复杂的聚合查询
2. 缓存策略不当
- 缓存命中率低
- 缓存雪崩
- 缓存穿透
3. 模板渲染
- 复杂的模板逻辑
- 未优化的循环
- 过多的模板继承
4. 静态文件
- 未压缩的资源
- 缺少CDN加速
- 缓存策略不当
5. 网络延迟
- API调用延迟
- 外部服务依赖
- DNS解析时间
"""#性能优化原则
"""
性能优化核心原则:
1. 测量驱动 (Measure First)
- 先测量,再优化
- 基于数据做决策
- 建立基准线
2. 80/20法则
- 优化20%的关键路径
- 重点关注高频操作
- 优先解决主要瓶颈
3. 渐进式优化
- 小步快跑
- 持续改进
- 避免过度优化
4. 成本效益
- 权衡优化成本
- 考虑维护复杂度
- 评估ROI
"""#数据库查询优化
#查询优化基础
# Django ORM查询优化技巧
from django.db import models
from django.contrib.auth.models import User
from django.db.models import Prefetch, Q, Count, Sum
from django.core.paginator import Paginator
class QueryOptimization:
"""查询优化工具类"""
@staticmethod
def avoid_n_plus_one_queries():
"""避免N+1查询问题"""
# ❌ 错误做法 - N+1查询
"""
posts = Post.objects.all()
for post in posts:
print(post.author.username) # 每次查询都会触发数据库查询
"""
# ✅ 正确做法 - 使用select_related
posts = Post.objects.select_related('author').all()
for post in posts:
print(post.author.username) # 不会触发额外查询
# ✅ 使用prefetch_related处理多对多关系
posts = Post.objects.prefetch_related('tags', 'comments').all()
for post in posts:
for tag in post.tags.all(): # 不会触发额外查询
print(tag.name)
@staticmethod
def optimize_with_prefetch():
"""使用Prefetch进行精细控制"""
from myapp.models import Author, Book
# 复杂的prefetch优化
authors = Author.objects.prefetch_related(
Prefetch(
'books',
queryset=Book.objects.select_related('publisher').filter(
published_date__year__gte=2020
),
to_attr='recent_books'
)
).all()
for author in authors:
# 访问recent_books不会触发额外查询
for book in author.recent_books:
print(book.title, book.publisher.name)
@staticmethod
def use_only_defer():
"""使用only和defer优化字段加载"""
# 只加载需要的字段
users = User.objects.only('username', 'email', 'date_joined')
# 排除不需要的大字段
articles = Article.objects.defer('content', 'large_image')
@staticmethod
def optimize_aggregation():
"""优化聚合查询"""
from django.db.models import Count, Avg, Max, Min
# 使用annotate添加聚合字段
categories = Category.objects.annotate(
post_count=Count('posts'),
avg_rating=Avg('posts__rating'),
max_views=Max('posts__views')
)
# 使用values进行分组查询
monthly_stats = Post.objects.values('created_at__date').annotate(
count=Count('id'),
avg_views=Avg('views')
).order_by('-created_at__date')
@staticmethod
def optimize_filters():
"""优化过滤查询"""
# 使用Q对象组合复杂条件
posts = Post.objects.filter(
Q(status='published') &
(Q(author__is_staff=True) | Q(views__gte=1000))
)
# 使用exists替代count进行存在性检查
has_published_posts = Post.objects.filter(
author=user, status='published'
).exists()
# 使用bulk操作进行批量操作
Post.objects.filter(status='draft').update(status='archived')
@staticmethod
def use_raw_sql_when_needed():
"""必要时使用原生SQL"""
from django.db import connection
# 复杂查询使用原生SQL
with connection.cursor() as cursor:
cursor.execute("""
SELECT u.username, COUNT(p.id) as post_count
FROM auth_user u
LEFT JOIN myapp_post p ON u.id = p.author_id
WHERE u.date_joined > %s
GROUP BY u.id, u.username
ORDER BY post_count DESC
LIMIT 10
""", ['2024-01-01'])
results = cursor.fetchall()
return results
# 查询优化示例
class OptimizedPostQuerySet(models.QuerySet):
"""优化的帖子查询集"""
def published(self):
"""已发布帖子"""
return self.filter(status='published')
def with_author_and_category(self):
"""预加载作者和分类信息"""
return self.select_related('author', 'category')
def with_popular_comments(self):
"""预加载热门评论"""
return self.prefetch_related(
Prefetch(
'comments',
queryset=Comment.objects.filter(
likes__gte=10
).select_related('author').order_by('-likes')
)
)
def with_tags(self):
"""预加载标签"""
return self.prefetch_related('tags')
def optimized_list(self):
"""优化的列表查询"""
return self.published().with_author_and_category().with_tags()
class PostManager(models.Manager):
"""优化的帖子管理器"""
def get_queryset(self):
return OptimizedPostQuerySet(self.model, using=self._db)
def get_optimized_posts(self):
"""获取优化的帖子列表"""
return self.get_queryset().optimized_list()
class Post(models.Model):
"""帖子模型"""
title = models.CharField(max_length=200)
content = models.TextField()
author = models.ForeignKey(User, on_delete=models.CASCADE)
category = models.ForeignKey('Category', on_delete=models.CASCADE)
status = models.CharField(max_length=20, default='draft')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
views = models.IntegerField(default=0)
objects = PostManager()
class Meta:
db_table = 'posts'
indexes = [
models.Index(fields=['status', 'created_at']),
models.Index(fields=['author', 'status']),
models.Index(fields=['category', 'created_at']),
]
# 使用示例
def optimized_post_list_view(request):
"""优化的帖子列表视图"""
posts = Post.objects.get_optimized_posts()
# 分页优化
paginator = Paginator(posts, 20) # 每页20篇
page_number = request.GET.get('page')
page_obj = paginator.get_page(page_number)
return render(request, 'posts/list.html', {'page_obj': page_obj})#数据库索引优化
# 数据库索引优化
class IndexOptimization:
"""索引优化类"""
@staticmethod
def create_efficient_indexes():
"""创建高效的索引"""
# 单列索引
class User(models.Model):
username = models.CharField(max_length=150, unique=True) # 自动创建唯一索引
email = models.EmailField(db_index=True) # 创建普通索引
created_at = models.DateTimeField(db_index=True)
class Meta:
db_table = 'users'
# 复合索引
indexes = [
models.Index(fields=['username', 'email']),
models.Index(fields=['created_at', 'username']),
models.Index(fields=['-created_at']), # 降序索引
]
@staticmethod
def use_partial_indexes():
"""使用部分索引(PostgreSQL)"""
class Order(models.Model):
status = models.CharField(max_length=20)
customer = models.ForeignKey('Customer', on_delete=models.CASCADE)
created_at = models.DateTimeField()
class Meta:
db_table = 'orders'
indexes = [
# 只为特定状态的订单创建索引
models.Index(
fields=['customer', 'created_at'],
name='idx_active_orders',
condition=Q(status='active')
),
]
@staticmethod
def use_function_based_indexes():
"""使用函数索引(PostgreSQL)"""
class Product(models.Model):
name = models.CharField(max_length=200)
slug = models.SlugField()
class Meta:
db_table = 'products'
indexes = [
# 大小写不敏感的索引
models.Index(
fields=['name'],
name='idx_product_name_ci',
opclasses=['varchar_pattern_ops']
),
# 基于表达式的索引
models.Index(
models.F('name').upper(),
name='idx_product_name_upper'
),
]
# 数据库查询分析
class QueryAnalyzer:
"""查询分析器"""
@staticmethod
def analyze_query_performance(queryset):
"""分析查询性能"""
import time
from django.db import connection
# 记录查询前的SQL数量
initial_queries = len(connection.queries)
initial_time = time.time()
# 执行查询
results = list(queryset)
# 记录查询后的时间和SQL数量
final_time = time.time()
final_queries = len(connection.queries)
print(f"执行时间: {final_time - initial_time:.4f}秒")
print(f"SQL查询数量: {final_queries - initial_queries}")
# 打印最后几个查询(调试用)
if connection.queries:
for query in connection.queries[-5:]: # 显示最后5个查询
print(f"SQL: {query['sql']}")
print(f"时间: {query['time']}")
return results
@staticmethod
def explain_query(queryset):
"""解释查询计划"""
from django.db import connection
# 获取查询的SQL
sql = str(queryset.query)
# 使用EXPLAIN分析(PostgreSQL/MySQL)
with connection.cursor() as cursor:
if connection.vendor == 'postgresql':
cursor.execute(f"EXPLAIN ANALYZE {sql}")
elif connection.vendor == 'mysql':
cursor.execute(f"EXPLAIN FORMAT=JSON {sql}")
explanation = cursor.fetchall()
return explanation
# 使用django-debug-toolbar进行性能分析
"""
# settings.py 配置
INSTALLED_APPS = [
# ...
'debug_toolbar',
]
MIDDLEWARE = [
# ...
'debug_toolbar.middleware.DebugToolbarMiddleware',
]
INTERNAL_IPS = [
'127.0.0.1',
]
# urls.py
if settings.DEBUG:
import debug_toolbar
urlpatterns = [
path('__debug__/', include(debug_toolbar.urls)),
] + urlpatterns
"""#连接池优化
# 数据库连接池优化
"""
# settings.py 数据库配置
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'myproject',
'USER': 'myprojectuser',
'PASSWORD': 'mypass',
'HOST': 'localhost',
'PORT': '5432',
'CONN_MAX_AGE': 60, # 连接复用时间(秒)
'OPTIONS': {
'MAX_CONNS': 20, # 最大连接数
'MIN_CONNS': 5, # 最小连接数
},
}
}
# 使用django-db-pool进行连接池管理
# pip install django-db-pool
DATABASES = {
'default': {
'ENGINE': 'dj_db_conn_pool.backends.postgresql',
'POOL_OPTIONS': {
'POOL_SIZE': 10,
'MAX_OVERFLOW': 10,
'RECYCLE': 24 * 60 * 60, # 24小时回收连接
},
# ... 其他配置
}
}
"""#缓存策略优化
#缓存层级策略
# 多层级缓存策略
from django.core.cache import cache
from django.core.cache.utils import make_template_fragment_key
from django.template import loader
from django.conf import settings
import time
import hashlib
import json
class MultiLevelCache:
"""多层级缓存策略"""
@staticmethod
def cache_levels():
"""缓存层级说明"""
levels = {
'L1': {
'type': '内存缓存',
'storage': 'Redis/Memcached',
'speed': '最快',
'capacity': '中等',
'use_case': '热点数据、会话缓存'
},
'L2': {
'type': '数据库缓存',
'storage': 'Django Cache Table',
'speed': '快',
'capacity': '大',
'use_case': '中等频率数据'
},
'L3': {
'type': '文件缓存',
'storage': '本地文件系统',
'speed': '慢',
'capacity': '很大',
'use_case': '静态内容、生成的文件'
}
}
return levels
@staticmethod
def smart_cache_get(key, callback, timeout=300):
"""智能缓存获取"""
# 尝试从缓存获取
result = cache.get(key)
if result is not None:
return result
# 缓存未命中,执行回调函数
result = callback()
# 存储到缓存
cache.set(key, result, timeout)
return result
@staticmethod
def cache_with_fallback(primary_key, fallback_key, callback, timeout=300):
"""带降级的缓存策略"""
# 首先尝试主缓存
result = cache.get(primary_key)
if result is not None:
return result
# 主缓存未命中,尝试备用缓存
result = cache.get(fallback_key)
if result is not None:
# 异步更新主缓存
from django.core.cache import cache
cache.set(primary_key, result, timeout)
return result
# 都未命中,执行回调
result = callback()
# 同时设置两个缓存
cache.set(primary_key, result, timeout)
cache.set(fallback_key, result, timeout * 2) # 备用缓存时间更长
return result
# 缓存装饰器
def cached_method(timeout=300, key_prefix=''):
"""方法缓存装饰器"""
def decorator(func):
from functools import wraps
@wraps(func)
def wrapper(self, *args, **kwargs):
# 生成缓存键
cache_key_parts = [key_prefix, func.__name__, str(self.__class__.__name__)]
cache_key_parts.extend([str(arg) for arg in args])
cache_key_parts.extend([f"{k}:{v}" for k, v in sorted(kwargs.items())])
cache_key = ":".join(cache_key_parts)
cache_key = hashlib.md5(cache_key.encode()).hexdigest()
# 尝试从缓存获取
result = cache.get(cache_key)
if result is not None:
return result
# 执行方法并缓存结果
result = func(self, *args, **kwargs)
cache.set(cache_key, result, timeout)
return result
return wrapper
return decorator
# 视图缓存装饰器
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
from django.views.generic import ListView
@cache_page(60 * 15) # 缓存15分钟
def product_list_view(request):
"""产品列表视图(已缓存)"""
products = Product.objects.all()
return render(request, 'products/list.html', {'products': products})
# 类视图缓存
@method_decorator(cache_page(60 * 15), name='dispatch')
class ProductListView(ListView):
"""产品列表类视图(已缓存)"""
model = Product
template_name = 'products/list.html'
paginate_by = 20
# 模板片段缓存
"""
{% load cache %}
{% cache 500 sidebar request.user.username %}
<!-- 昂贵的查询或处理 -->
{% for item in expensive_list %}
<div>{{ item.name }}</div>
{% endfor %}
{% endcache %}
"""#缓存失效策略
# 缓存失效策略
class CacheInvalidationStrategy:
"""缓存失效策略"""
@staticmethod
def time_based_invalidatoin():
"""基于时间的失效策略"""
# 设置合理的TTL
cache.set('user_profile', user_data, timeout=300) # 5分钟
# 不同数据类型使用不同TTL
cache.set('hot_news', news_data, timeout=60) # 1分钟(热点新闻)
cache.set('static_config', config_data, timeout=3600) # 1小时(配置)
cache.set('historical_data', data, timeout=86400) # 24小时(历史数据)
@staticmethod
def event_based_invalidation():
"""基于事件的失效策略"""
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
@receiver(post_save, sender=User)
def invalidate_user_cache(sender, instance, **kwargs):
"""用户保存时失效相关缓存"""
# 失效用户相关缓存
cache.delete(f'user_profile_{instance.id}')
cache.delete(f'user_permissions_{instance.id}')
# 如果是重要字段更新,失效更多缓存
if kwargs.get('update_fields'):
if 'username' in kwargs['update_fields']:
# 清除所有涉及用户名的缓存
cache.delete_many([
f'user_by_username_{instance.username}',
f'user_search_{instance.username}',
])
@staticmethod
def cache_warming_strategy():
"""缓存预热策略"""
from celery import shared_task
@shared_task
def warm_cache():
"""缓存预热任务"""
# 预热热门数据
popular_products = Product.objects.filter(
views__gte=1000
).select_related('category').prefetch_related('tags')[:100]
# 批量缓存
cache_data = {}
for product in popular_products:
cache_data[f'product_detail_{product.id}'] = {
'id': product.id,
'name': product.name,
'category': product.category.name,
'tags': [tag.name for tag in product.tags.all()],
}
cache.set_many(cache_data, timeout=3600)
@staticmethod
def cache_dependency_management():
"""缓存依赖管理"""
# 使用版本号管理缓存依赖
def get_cache_version_key(dependency_type, dependency_id):
return f'cache_version:{dependency_type}:{dependency_id}'
def invalidate_dependency(dependency_type, dependency_id):
"""失效依赖"""
version_key = get_cache_version_key(dependency_type, dependency_id)
current_version = cache.get(version_key, 0)
cache.set(version_key, current_version + 1, timeout=None)
def get_versioned_cache_key(base_key, dependency_type, dependency_id):
"""获取版本化缓存键"""
version_key = get_cache_version_key(dependency_type, dependency_id)
version = cache.get(version_key, 0)
return f'{base_key}:v{version}'
# 使用示例
def get_user_products_with_cache(user_id):
cache_key = get_versioned_cache_key(
f'user_products_{user_id}',
'user',
user_id
)
products = cache.get(cache_key)
if products is None:
products = Product.objects.filter(owner_id=user_id)
cache.set(cache_key, products, timeout=300)
return products
# Redis高级缓存模式
"""
# 使用Redis实现分布式锁
import redis
import time
import uuid
class DistributedLock:
def __init__(self, redis_client, lock_name, acquire_timeout=10, lock_timeout=10):
self.redis = redis_client
self.lock_name = f'lock:{lock_name}'
self.acquire_timeout = acquire_timeout
self.lock_timeout = lock_timeout
self.identifier = str(uuid.uuid4())
def __enter__(self):
acquired = self.acquire()
if not acquired:
raise TimeoutError("Could not acquire lock")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
def acquire(self):
end = time.time() + self.acquire_timeout
while time.time() < end:
if self.redis.set(self.lock_name, self.identifier, nx=True, ex=self.lock_timeout):
return True
time.sleep(0.001)
return False
def release(self):
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.redis.eval(lua_script, 1, self.lock_name, self.identifier)
# 使用分布式锁
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def critical_section():
with DistributedLock(redis_client, 'critical_operation', acquire_timeout=5):
# 执行关键操作
pass
"""#模板性能优化
#模板渲染优化
# 模板渲染优化
from django.template import Context, Template
from django.template.loader import get_template, render_to_string
from django.utils.safestring import mark_safe
import time
class TemplateOptimizer:
"""模板优化器"""
@staticmethod
def minimize_template_logic():
"""最小化模板逻辑"""
# ❌ 避免在模板中进行复杂计算
"""
<!-- templates/bad_example.html -->
{% for item in items %}
{% if item.price > 100 and item.quantity > 10 and item.category == 'electronics' %}
<div class="expensive-item">{{ item.name }}</div>
{% endif %}
{% endfor %}
"""
# ✅ 在视图中预处理数据
"""
# views.py
def product_list_view(request):
expensive_items = Item.objects.filter(
price__gt=100,
quantity__gt=10,
category='electronics'
)
context = {
'expensive_items': expensive_items,
'regular_items': Item.objects.exclude(
pk__in=expensive_items.values_list('pk', flat=True)
)
}
return render(request, 'products/list.html', context)
"""
@staticmethod
def optimize_template_inheritance():
"""优化模板继承"""
# 使用block.super提高效率
"""
<!-- base.html -->
<!DOCTYPE html>
<html>
<head>
{% block head %}
<title>{% block title %}Default Title{% endblock %}</title>
{% endblock %}
</head>
<body>
{% block content %}{% endblock %}
</body>
</html>
<!-- child.html -->
{% extends "base.html" %}
{% block title %}{{ block.super }} - Child Page{% endblock %}
{% block content %}
<!-- 具体内容 -->
{% endblock %}
"""
@staticmethod
def use_template_caching():
"""使用模板缓存"""
from django.core.cache import cache
def render_cached_template(template_name, context, cache_key, timeout=300):
"""渲染缓存模板"""
cached_result = cache.get(cache_key)
if cached_result:
return cached_result
# 渲染模板
result = render_to_string(template_name, context)
# 缓存结果
cache.set(cache_key, result, timeout)
return result
@staticmethod
def optimize_loop_performance():
"""优化循环性能"""
# 预计算循环中的复杂表达式
"""
<!-- 在视图中预处理 -->
def dashboard_view(request):
# 预计算用户权限
user_permissions = {
user.id: user.get_all_permissions()
for user in User.objects.all()
}
# 预计算统计数据
user_stats = {
user.id: {
'post_count': user.posts.count(),
'comment_count': user.comments.count(),
'last_login_ago': time_since(user.last_login)
}
for user in User.objects.all()
}
context = {
'users': User.objects.all(),
'user_permissions': user_permissions,
'user_stats': user_stats,
}
return render(request, 'dashboard.html', context)
"""
# 自定义模板标签优化
from django import template
register = template.Library()
@register.simple_tag(takes_context=True)
def user_permission_check(context, permission):
"""用户权限检查标签(已缓存)"""
user = context['user']
cache_key = f"user_perm_{user.id}_{permission}"
has_perm = cache.get(cache_key)
if has_perm is None:
has_perm = user.has_perm(permission)
cache.set(cache_key, has_perm, 300) # 缓存5分钟
return has_perm
@register.inclusion_tag('tags/user_badge.html', takes_context=True)
def user_badge(context, user):
"""用户徽章标签"""
request = context['request']
is_owner = request.user == user
user_level = 'vip' if user.profile.is_vip else 'regular'
return {
'user': user,
'is_owner': is_owner,
'user_level': user_level,
'request': request,
}
# 模板碎片缓存
@register.simple_tag
def cached_user_widget(user_id):
"""缓存的用户组件"""
cache_key = f"user_widget_{user_id}"
widget_html = cache.get(cache_key)
if widget_html is None:
user = User.objects.get(id=user_id)
widget_html = render_to_string('widgets/user_widget.html', {'user': user})
cache.set(cache_key, widget_html, 600) # 缓存10分钟
return mark_safe(widget_html)#静态文件优化
# 静态文件优化
"""
# settings.py 静态文件配置
STATIC_URL = '/static/'
STATIC_ROOT = os.path.join(BASE_DIR, 'staticfiles')
STATICFILES_DIRS = [
os.path.join(BASE_DIR, 'static'),
]
# 静态文件存储后端
STATICFILES_STORAGE = 'django.contrib.staticfiles.storage.ManifestStaticFilesStorage'
# 使用CDN
STATIC_URL = 'https://cdn.yourdomain.com/static/'
"""
# 静态文件版本控制
class StaticFileVersioning:
"""静态文件版本控制"""
@staticmethod
def add_cache_busting():
"""添加缓存破坏"""
# 在模板中使用版本参数
"""
<!-- 在base.html中 -->
{% load static %}
<link rel="stylesheet" type="text/css" href="{% static 'css/style.css' %}?v={% now 'U' %}">
<script src="{% static 'js/app.js' %}?v={% now 'U' %}"></script>
"""
@staticmethod
def use_manifest_static_files():
"""使用清单静态文件存储"""
"""
# settings.py
STATICFILES_STORAGE = 'django.contrib.staticfiles.storage.ManifestStaticFilesStorage'
# 这会自动为静态文件添加哈希后缀
# style.css -> style.a1b2c3d4.css
"""
@staticmethod
def compress_static_files():
"""压缩静态文件"""
"""
# 安装 django-compressor
pip install django-compressor
# settings.py
INSTALLED_APPS = [
# ...
'compressor',
]
STATICFILES_FINDERS = [
'django.contrib.staticfiles.finders.FileSystemFinder',
'django.contrib.staticfiles.finders.AppDirectoriesFinder',
'compressor.finders.CompressorFinder',
]
COMPRESS_ENABLED = True
COMPRESS_OFFLINE = True
# 在模板中使用
{% load compress %}
{% compress css %}
<link rel="stylesheet" type="text/css" href="{% static 'css/style1.css' %}">
<link rel="stylesheet" type="text/css" href="{% static 'css/style2.css' %}">
{% endcompress %}
{% compress js %}
<script src="{% static 'js/script1.js' %}"></script>
<script src="{% static 'js/script2.js' %}"></script>
{% endcompress %}
"""
# Webpack集成优化
"""
# webpack.config.js 示例
const path = require('path');
const MiniCssExtractPlugin = require('mini-css-extract-plugin');
module.exports = {
entry: './assets/js/main.js',
output: {
path: path.resolve(__dirname, 'static/dist'),
filename: 'js/[name].[contenthash].js',
clean: true,
},
module: {
rules: [
{
test: /\.css$/,
use: [MiniCssExtractPlugin.loader, 'css-loader'],
},
{
test: /\.js$/,
exclude: /node_modules/,
use: 'babel-loader',
},
],
},
plugins: [
new MiniCssExtractPlugin({
filename: 'css/[name].[contenthash].css',
}),
],
};
"""#异步处理优化
#Django异步支持
# Django异步处理
import asyncio
import aiohttp
from django.http import JsonResponse
from django.views import View
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
import json
class AsyncViewOptimization:
"""异步视图优化"""
@staticmethod
def async_view_example():
"""异步视图示例"""
# Django 3.1+ 支持原生异步视图
"""
# views.py
import asyncio
from django.http import JsonResponse
async def async_api_view(request):
# 模拟异步操作
await asyncio.sleep(1) # 实际场景中可能是数据库查询、API调用等
# 异步获取多个数据源
user_task = fetch_user_data()
post_task = fetch_post_data()
user_data, post_data = await asyncio.gather(user_task, post_task)
return JsonResponse({
'user': user_data,
'posts': post_data
})
async def fetch_user_data():
# 异步获取用户数据
await asyncio.sleep(0.1)
return {'id': 1, 'name': 'John'}
async def fetch_post_data():
# 异步获取帖子数据
await asyncio.sleep(0.2)
return [{'id': 1, 'title': 'Post 1'}, {'id': 2, 'title': 'Post 2'}]
"""
@staticmethod
def async_class_view():
"""异步类视图"""
"""
from django.views import View
class AsyncDataView(View):
async def get(self, request):
# 异步处理
data = await self.fetch_async_data()
return JsonResponse(data)
async def fetch_async_data(self):
# 异步数据获取
import asyncpg
conn = await asyncpg.connect(
host='localhost',
port=5432,
user='user',
password='password',
database='db'
)
result = await conn.fetch('SELECT * FROM users LIMIT 10')
await conn.close()
return [dict(row) for row in result]
"""
@staticmethod
def async_database_operations():
"""异步数据库操作"""
"""
# 需要使用支持异步的数据库驱动
# 如 asyncpg for PostgreSQL
import databases
import sqlalchemy
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
async def startup():
await database.connect()
async def shutdown():
await database.disconnect()
async def get_users():
query = "SELECT * FROM users"
return await database.fetch_all(query)
"""
# Celery异步任务
"""
# Celery配置
# celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# tasks.py
from celery import shared_task
import time
@shared_task
def send_email_task(recipient, subject, body):
# 模拟发送邮件
time.sleep(5) # 实际发送邮件的操作
return f"Email sent to {recipient}"
@shared_task
def process_large_dataset(dataset_id):
# 处理大数据集
dataset = Dataset.objects.get(id=dataset_id)
# 执行复杂的数据处理
result = complex_data_processing(dataset.data)
dataset.result = result
dataset.save()
return "Processing completed"
"""
# 异步信号处理器
"""
from asgiref.sync import sync_to_async
from django.db.models.signals import post_save
from django.dispatch import receiver
@receiver(post_save, sender=User)
def async_user_created_handler(sender, instance, created, **kwargs):
if created:
# 使用sync_to_async包装同步任务
async_task = sync_to_async(send_welcome_email)(instance.email)
# 或者使用Celery任务
send_welcome_email.delay(instance.email)
"""#WebSocket实时优化
# WebSocket优化
"""
# Django Channels 配置
# routing.py
from django.urls import path
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import myapp.consumers
application = ProtocolTypeRouter({
"websocket": AuthMiddlewareStack(
URLRouter([
path("ws/chat/<str:room_name>/", myapp.consumers.ChatConsumer.as_asgi()),
])
),
})
# consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
# 加入房间组
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
# 离开房间组
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
# 发送消息到房间组
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'message': message
}
)
async def chat_message(self, event):
message = event['message']
# 发送消息到WebSocket
await self.send(text_data=json.dumps({
'message': message
}))
"""#部署优化
#生产环境配置
# 生产环境优化配置
"""
# production.py
from .base import *
DEBUG = False
ALLOWED_HOSTS = ['yourdomain.com', 'www.yourdomain.com']
# 数据库优化
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': os.environ.get('DB_NAME'),
'USER': os.environ.get('DB_USER'),
'PASSWORD': os.environ.get('DB_PASSWORD'),
'HOST': os.environ.get('DB_HOST', 'localhost'),
'PORT': os.environ.get('DB_PORT', '5432'),
'CONN_MAX_AGE': 60,
'OPTIONS': {
'MAX_CONNS': 20,
'MIN_CONNS': 5,
},
}
}
# 缓存配置
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/1',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 20,
'retry_on_timeout': True,
}
},
'KEY_PREFIX': 'myapp',
'TIMEOUT': 300,
}
}
# 静态文件配置
STATIC_URL = '/static/'
STATIC_ROOT = '/var/www/myapp/static/'
MEDIA_URL = '/media/'
MEDIA_ROOT = '/var/www/myapp/media/'
# 安全设置
SECURE_SSL_REDIRECT = True
SECURE_HSTS_SECONDS = 31536000
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
SECURE_CONTENT_TYPE_NOSNIFF = True
SECURE_BROWSER_XSS_FILTER = True
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
# 性能优化
USE_TZ = True
TIME_ZONE = 'UTC'
LANGUAGE_CODE = 'en-us'
# 压缩
COMPRESS_ENABLED = True
COMPRESS_OFFLINE = True
"""
# Gunicorn配置
"""
# gunicorn.conf.py
bind = "127.0.0.1:8000"
workers = 4 # 根据CPU核心数调整
worker_class = "sync" # 或 "gevent", "eventlet" 用于异步
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
timeout = 30
keepalive = 5
preload_app = True
"""#Docker优化
# Dockerfile优化示例
"""
FROM python:3.11-slim
# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV DJANGO_SETTINGS_MODULE=myproject.settings.production
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# 创建应用目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建非root用户
RUN useradd --create-home --shell /bin/bash app \
&& chown -R app:app /app
USER app
# 收集静态文件
RUN python manage.py collectstatic --noinput
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["gunicorn", "--config", "gunicorn.conf.py", "myproject.wsgi:application"]
"""#性能监控与分析
#性能监控工具
# 性能监控实现
import time
import psutil
import logging
from django.db import connection
from django.core.handlers.wsgi import WSGIRequest
from functools import wraps
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.logger = logging.getLogger('performance')
def monitor_view_performance(self, view_func):
"""监控视图性能的装饰器"""
@wraps(view_func)
def wrapper(request, *args, **kwargs):
start_time = time.time()
start_queries = len(connection.queries)
start_memory = psutil.Process().memory_info().rss
try:
response = view_func(request, *args, **kwargs)
except Exception as e:
self.logger.error(f"View error: {view_func.__name__}, Error: {str(e)}")
raise
end_time = time.time()
end_queries = len(connection.queries)
end_memory = psutil.Process().memory_info().rss
# 计算性能指标
duration = end_time - start_time
query_count = end_queries - start_queries
memory_delta = end_memory - start_memory
# 记录性能数据
self.logger.info(
f"View: {view_func.__name__}, "
f"Duration: {duration:.3f}s, "
f"Queries: {query_count}, "
f"Memory Delta: {memory_delta / 1024 / 1024:.2f}MB, "
f"Method: {request.method}, "
f"Path: {request.path}"
)
# 性能告警
if duration > 2.0: # 超过2秒的请求
self.logger.warning(f"SLOW REQUEST: {request.path}, Duration: {duration:.3f}s")
if query_count > 50: # 超过50个查询
self.logger.warning(f"HIGH QUERY COUNT: {request.path}, Queries: {query_count}")
return response
return wrapper
# 使用性能监控装饰器
monitor = PerformanceMonitor()
@monitor.monitor_view_performance
def expensive_view(request):
"""耗时的视图函数"""
# 模拟耗时操作
time.sleep(1)
# 模拟多个数据库查询
users = User.objects.all()[:100]
return JsonResponse({'users': [user.username for user in users]})
# 中间件形式的性能监控
class PerformanceMonitoringMiddleware:
"""性能监控中间件"""
def __init__(self, get_response):
self.get_response = get_response
self.monitor = PerformanceMonitor()
def __call__(self, request):
start_time = time.time()
start_queries = len(connection.queries)
response = self.get_response(request)
end_time = time.time()
end_queries = len(connection.queries)
duration = end_time - start_time
query_count = end_queries - start_queries
# 记录性能数据
logging.info(
f"Request: {request.method} {request.path}, "
f"Duration: {duration:.3f}s, "
f"Queries: {query_count}, "
f"Status: {response.status_code}"
)
return response
# 性能分析工具集成
"""
# 安装 django-silk 进行性能分析
pip install django-silk
# settings.py
INSTALLED_APPS = [
# ...
'silk',
]
MIDDLEWARE = [
# ...
'silk.middleware.SilkyMiddleware',
# ...
]
# urls.py
if settings.DEBUG:
urlpatterns += [path('silk/', include('silk.urls', namespace='silk'))]
# 安装 django-debug-toolbar
pip install django-debug-toolbar
# settings.py
INSTALLED_APPS = [
# ...
'debug_toolbar',
]
MIDDLEWARE = [
# ...
'debug_toolbar.middleware.DebugToolbarMiddleware',
]
INTERNAL_IPS = ['127.0.0.1']
"""#APM监控
# APM监控集成示例
"""
# 使用 New Relic
pip install newrelic
# newrelic.ini
[newrelic]
license_key = your_license_key
app_name = My Django App
monitor_mode = true
# 启动应用时使用
NEW_RELIC_CONFIG_FILE=newrelic.ini newrelic-admin run-program python manage.py runserver
# 使用 Datadog
pip install datadog
# 在 settings.py 中配置
DATADOG_API_KEY = 'your_datadog_api_key'
# 使用 Sentry 进行错误监控
pip install sentry-sdk
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
sentry_sdk.init(
dsn="your_sentry_dsn",
integrations=[DjangoIntegration()],
traces_sample_rate=1.0,
send_default_pii=True
)
"""#本章小结
在本章中,我们深入学习了Django性能优化:
- 性能优化概述:了解了性能指标和优化原则
- 数据库查询优化:掌握了ORM优化、索引优化和连接池管理
- 缓存策略优化:学习了多层级缓存和失效策略
- 模板性能优化:掌握了模板渲染和静态文件优化
- 异步处理优化:了解了异步视图和任务队列
- 部署优化:学习了生产环境配置和容器化
- 性能监控与分析:掌握了监控工具和APM集成
#核心要点回顾
"""
本章核心要点:
1. 性能优化需要基于测量数据,不能盲目优化
2. 数据库查询优化是最重要的优化方向
3. 合理的缓存策略能显著提升性能
4. 异步处理适合I/O密集型任务
5. 生产环境配置对性能影响巨大
6. 持续监控是保持高性能的关键
7. 性能优化是一个持续的过程
8. 避免过早优化,关注主要瓶颈
"""#下一步学习
现在您已经掌握了Django性能优化,可以继续学习:
💡 核心要点:性能优化需要平衡多个因素,包括响应时间、吞吐量、资源利用率和可维护性。建立完善的监控体系,持续观察和优化,是保持应用高性能的关键。
#SEO优化策略
- 关键词布局: 在标题、内容中合理布局"Django性能优化", "数据库优化", "缓存策略", "异步处理", "性能调优", "Web性能"等关键词
- 内容结构: 使用清晰的标题层级(H1-H3),便于搜索引擎理解内容结构
- 内部链接: 建立与其他相关教程的内部链接,提升页面权重
- 代码示例: 提供丰富的实际代码示例,增加页面价值
- 元数据优化: 在页面头部包含描述性的标题、描述和标签
🔗 相关教程推荐
🏷️ 标签云: Django性能优化 数据库优化 缓存策略 异步处理 性能调优 Web性能 查询优化 APM监控

