Django性能优化 - 提升应用响应速度与并发能力

📂 所属阶段:第三部分 — 高级主题
🎯 难度等级:高级
⏰ 预计学习时间:6-8小时
🎒 前置知识:缓存策略, 数据库查询

目录

性能优化概述

性能优化是Web应用开发中至关重要的环节,直接影响用户体验和系统可扩展性。

性能指标

"""
关键性能指标:

1. 响应时间 (Response Time)
   - 首字节时间 (TTFB)
   - 页面加载时间
   - API响应时间

2. 吞吐量 (Throughput)
   - 每秒请求数 (RPS/QPS)
   - 并发用户数
   - 数据处理量

3. 资源利用率
   - CPU使用率
   - 内存占用
   - 数据库连接数

4. 可扩展性指标
   - 水平扩展能力
   - 垂直扩展能力
   - 负载均衡效果
"""

Django性能瓶颈

"""
Django常见性能瓶颈:

1. 数据库查询
   - N+1查询问题
   - 未优化的JOIN操作
   - 缺少索引的查询
   - 复杂的聚合查询

2. 缓存策略不当
   - 缓存命中率低
   - 缓存雪崩
   - 缓存穿透

3. 模板渲染
   - 复杂的模板逻辑
   - 未优化的循环
   - 过多的模板继承

4. 静态文件
   - 未压缩的资源
   - 缺少CDN加速
   - 缓存策略不当

5. 网络延迟
   - API调用延迟
   - 外部服务依赖
   - DNS解析时间
"""

性能优化原则

"""
性能优化核心原则:

1. 测量驱动 (Measure First)
   - 先测量,再优化
   - 基于数据做决策
   - 建立基准线

2. 80/20法则
   - 优化20%的关键路径
   - 重点关注高频操作
   - 优先解决主要瓶颈

3. 渐进式优化
   - 小步快跑
   - 持续改进
   - 避免过度优化

4. 成本效益
   - 权衡优化成本
   - 考虑维护复杂度
   - 评估ROI
"""

数据库查询优化

查询优化基础

# Django ORM查询优化技巧
from django.db import models
from django.contrib.auth.models import User
from django.db.models import Prefetch, Q, Count, Sum
from django.core.paginator import Paginator

class QueryOptimization:
    """查询优化工具类"""
    
    @staticmethod
    def avoid_n_plus_one_queries():
        """避免N+1查询问题"""
        
        # ❌ 错误做法 - N+1查询
        """
        posts = Post.objects.all()
        for post in posts:
            print(post.author.username)  # 每次查询都会触发数据库查询
        """
        
        # ✅ 正确做法 - 使用select_related
        posts = Post.objects.select_related('author').all()
        for post in posts:
            print(post.author.username)  # 不会触发额外查询
        
        # ✅ 使用prefetch_related处理多对多关系
        posts = Post.objects.prefetch_related('tags', 'comments').all()
        for post in posts:
            for tag in post.tags.all():  # 不会触发额外查询
                print(tag.name)
    
    @staticmethod
    def optimize_with_prefetch():
        """使用Prefetch进行精细控制"""
        from myapp.models import Author, Book
        
        # 复杂的prefetch优化
        authors = Author.objects.prefetch_related(
            Prefetch(
                'books',
                queryset=Book.objects.select_related('publisher').filter(
                    published_date__year__gte=2020
                ),
                to_attr='recent_books'
            )
        ).all()
        
        for author in authors:
            # 访问recent_books不会触发额外查询
            for book in author.recent_books:
                print(book.title, book.publisher.name)
    
    @staticmethod
    def use_only_defer():
        """使用only和defer优化字段加载"""
        # 只加载需要的字段
        users = User.objects.only('username', 'email', 'date_joined')
        
        # 排除不需要的大字段
        articles = Article.objects.defer('content', 'large_image')
    
    @staticmethod
    def optimize_aggregation():
        """优化聚合查询"""
        from django.db.models import Count, Avg, Max, Min
        
        # 使用annotate添加聚合字段
        categories = Category.objects.annotate(
            post_count=Count('posts'),
            avg_rating=Avg('posts__rating'),
            max_views=Max('posts__views')
        )
        
        # 使用values进行分组查询
        monthly_stats = Post.objects.values('created_at__date').annotate(
            count=Count('id'),
            avg_views=Avg('views')
        ).order_by('-created_at__date')
    
    @staticmethod
    def optimize_filters():
        """优化过滤查询"""
        # 使用Q对象组合复杂条件
        posts = Post.objects.filter(
            Q(status='published') & 
            (Q(author__is_staff=True) | Q(views__gte=1000))
        )
        
        # 使用exists替代count进行存在性检查
        has_published_posts = Post.objects.filter(
            author=user, status='published'
        ).exists()
        
        # 使用bulk操作进行批量操作
        Post.objects.filter(status='draft').update(status='archived')
    
    @staticmethod
    def use_raw_sql_when_needed():
        """必要时使用原生SQL"""
        from django.db import connection
        
        # 复杂查询使用原生SQL
        with connection.cursor() as cursor:
            cursor.execute("""
                SELECT u.username, COUNT(p.id) as post_count
                FROM auth_user u
                LEFT JOIN myapp_post p ON u.id = p.author_id
                WHERE u.date_joined > %s
                GROUP BY u.id, u.username
                ORDER BY post_count DESC
                LIMIT 10
            """, ['2024-01-01'])
            
            results = cursor.fetchall()
        
        return results

# 查询优化示例
class OptimizedPostQuerySet(models.QuerySet):
    """优化的帖子查询集"""
    
    def published(self):
        """已发布帖子"""
        return self.filter(status='published')
    
    def with_author_and_category(self):
        """预加载作者和分类信息"""
        return self.select_related('author', 'category')
    
    def with_popular_comments(self):
        """预加载热门评论"""
        return self.prefetch_related(
            Prefetch(
                'comments',
                queryset=Comment.objects.filter(
                    likes__gte=10
                ).select_related('author').order_by('-likes')
            )
        )
    
    def with_tags(self):
        """预加载标签"""
        return self.prefetch_related('tags')
    
    def optimized_list(self):
        """优化的列表查询"""
        return self.published().with_author_and_category().with_tags()

class PostManager(models.Manager):
    """优化的帖子管理器"""
    
    def get_queryset(self):
        return OptimizedPostQuerySet(self.model, using=self._db)
    
    def get_optimized_posts(self):
        """获取优化的帖子列表"""
        return self.get_queryset().optimized_list()

class Post(models.Model):
    """帖子模型"""
    title = models.CharField(max_length=200)
    content = models.TextField()
    author = models.ForeignKey(User, on_delete=models.CASCADE)
    category = models.ForeignKey('Category', on_delete=models.CASCADE)
    status = models.CharField(max_length=20, default='draft')
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    views = models.IntegerField(default=0)
    
    objects = PostManager()
    
    class Meta:
        db_table = 'posts'
        indexes = [
            models.Index(fields=['status', 'created_at']),
            models.Index(fields=['author', 'status']),
            models.Index(fields=['category', 'created_at']),
        ]

# 使用示例
def optimized_post_list_view(request):
    """优化的帖子列表视图"""
    posts = Post.objects.get_optimized_posts()
    
    # 分页优化
    paginator = Paginator(posts, 20)  # 每页20篇
    page_number = request.GET.get('page')
    page_obj = paginator.get_page(page_number)
    
    return render(request, 'posts/list.html', {'page_obj': page_obj})

数据库索引优化

# 数据库索引优化
class IndexOptimization:
    """索引优化类"""
    
    @staticmethod
    def create_efficient_indexes():
        """创建高效的索引"""
        # 单列索引
        class User(models.Model):
            username = models.CharField(max_length=150, unique=True)  # 自动创建唯一索引
            email = models.EmailField(db_index=True)  # 创建普通索引
            created_at = models.DateTimeField(db_index=True)
            
            class Meta:
                db_table = 'users'
                # 复合索引
                indexes = [
                    models.Index(fields=['username', 'email']),
                    models.Index(fields=['created_at', 'username']),
                    models.Index(fields=['-created_at']),  # 降序索引
                ]
    
    @staticmethod
    def use_partial_indexes():
        """使用部分索引(PostgreSQL)"""
        class Order(models.Model):
            status = models.CharField(max_length=20)
            customer = models.ForeignKey('Customer', on_delete=models.CASCADE)
            created_at = models.DateTimeField()
            
            class Meta:
                db_table = 'orders'
                indexes = [
                    # 只为特定状态的订单创建索引
                    models.Index(
                        fields=['customer', 'created_at'],
                        name='idx_active_orders',
                        condition=Q(status='active')
                    ),
                ]
    
    @staticmethod
    def use_function_based_indexes():
        """使用函数索引(PostgreSQL)"""
        class Product(models.Model):
            name = models.CharField(max_length=200)
            slug = models.SlugField()
            
            class Meta:
                db_table = 'products'
                indexes = [
                    # 大小写不敏感的索引
                    models.Index(
                        fields=['name'],
                        name='idx_product_name_ci',
                        opclasses=['varchar_pattern_ops']
                    ),
                    # 基于表达式的索引
                    models.Index(
                        models.F('name').upper(),
                        name='idx_product_name_upper'
                    ),
                ]

# 数据库查询分析
class QueryAnalyzer:
    """查询分析器"""
    
    @staticmethod
    def analyze_query_performance(queryset):
        """分析查询性能"""
        import time
        from django.db import connection
        
        # 记录查询前的SQL数量
        initial_queries = len(connection.queries)
        initial_time = time.time()
        
        # 执行查询
        results = list(queryset)
        
        # 记录查询后的时间和SQL数量
        final_time = time.time()
        final_queries = len(connection.queries)
        
        print(f"执行时间: {final_time - initial_time:.4f}秒")
        print(f"SQL查询数量: {final_queries - initial_queries}")
        
        # 打印最后几个查询(调试用)
        if connection.queries:
            for query in connection.queries[-5:]:  # 显示最后5个查询
                print(f"SQL: {query['sql']}")
                print(f"时间: {query['time']}")
        
        return results
    
    @staticmethod
    def explain_query(queryset):
        """解释查询计划"""
        from django.db import connection
        
        # 获取查询的SQL
        sql = str(queryset.query)
        
        # 使用EXPLAIN分析(PostgreSQL/MySQL)
        with connection.cursor() as cursor:
            if connection.vendor == 'postgresql':
                cursor.execute(f"EXPLAIN ANALYZE {sql}")
            elif connection.vendor == 'mysql':
                cursor.execute(f"EXPLAIN FORMAT=JSON {sql}")
            
            explanation = cursor.fetchall()
            
        return explanation

# 使用django-debug-toolbar进行性能分析
"""
# settings.py 配置
INSTALLED_APPS = [
    # ...
    'debug_toolbar',
]

MIDDLEWARE = [
    # ...
    'debug_toolbar.middleware.DebugToolbarMiddleware',
]

INTERNAL_IPS = [
    '127.0.0.1',
]

# urls.py
if settings.DEBUG:
    import debug_toolbar
    urlpatterns = [
        path('__debug__/', include(debug_toolbar.urls)),
    ] + urlpatterns
"""

连接池优化

# 数据库连接池优化
"""
# settings.py 数据库配置
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'myproject',
        'USER': 'myprojectuser',
        'PASSWORD': 'mypass',
        'HOST': 'localhost',
        'PORT': '5432',
        'CONN_MAX_AGE': 60,  # 连接复用时间(秒)
        'OPTIONS': {
            'MAX_CONNS': 20,  # 最大连接数
            'MIN_CONNS': 5,   # 最小连接数
        },
    }
}

# 使用django-db-pool进行连接池管理
# pip install django-db-pool

DATABASES = {
    'default': {
        'ENGINE': 'dj_db_conn_pool.backends.postgresql',
        'POOL_OPTIONS': {
            'POOL_SIZE': 10,
            'MAX_OVERFLOW': 10,
            'RECYCLE': 24 * 60 * 60,  # 24小时回收连接
        },
        # ... 其他配置
    }
}
"""

缓存策略优化

缓存层级策略

# 多层级缓存策略
from django.core.cache import cache
from django.core.cache.utils import make_template_fragment_key
from django.template import loader
from django.conf import settings
import time
import hashlib
import json

class MultiLevelCache:
    """多层级缓存策略"""
    
    @staticmethod
    def cache_levels():
        """缓存层级说明"""
        levels = {
            'L1': {
                'type': '内存缓存',
                'storage': 'Redis/Memcached',
                'speed': '最快',
                'capacity': '中等',
                'use_case': '热点数据、会话缓存'
            },
            'L2': {
                'type': '数据库缓存',
                'storage': 'Django Cache Table',
                'speed': '快',
                'capacity': '大',
                'use_case': '中等频率数据'
            },
            'L3': {
                'type': '文件缓存',
                'storage': '本地文件系统',
                'speed': '慢',
                'capacity': '很大',
                'use_case': '静态内容、生成的文件'
            }
        }
        return levels
    
    @staticmethod
    def smart_cache_get(key, callback, timeout=300):
        """智能缓存获取"""
        # 尝试从缓存获取
        result = cache.get(key)
        if result is not None:
            return result
        
        # 缓存未命中,执行回调函数
        result = callback()
        
        # 存储到缓存
        cache.set(key, result, timeout)
        
        return result
    
    @staticmethod
    def cache_with_fallback(primary_key, fallback_key, callback, timeout=300):
        """带降级的缓存策略"""
        # 首先尝试主缓存
        result = cache.get(primary_key)
        if result is not None:
            return result
        
        # 主缓存未命中,尝试备用缓存
        result = cache.get(fallback_key)
        if result is not None:
            # 异步更新主缓存
            from django.core.cache import cache
            cache.set(primary_key, result, timeout)
            return result
        
        # 都未命中,执行回调
        result = callback()
        
        # 同时设置两个缓存
        cache.set(primary_key, result, timeout)
        cache.set(fallback_key, result, timeout * 2)  # 备用缓存时间更长
        
        return result

# 缓存装饰器
def cached_method(timeout=300, key_prefix=''):
    """方法缓存装饰器"""
    def decorator(func):
        from functools import wraps
        
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            # 生成缓存键
            cache_key_parts = [key_prefix, func.__name__, str(self.__class__.__name__)]
            cache_key_parts.extend([str(arg) for arg in args])
            cache_key_parts.extend([f"{k}:{v}" for k, v in sorted(kwargs.items())])
            
            cache_key = ":".join(cache_key_parts)
            cache_key = hashlib.md5(cache_key.encode()).hexdigest()
            
            # 尝试从缓存获取
            result = cache.get(cache_key)
            if result is not None:
                return result
            
            # 执行方法并缓存结果
            result = func(self, *args, **kwargs)
            cache.set(cache_key, result, timeout)
            
            return result
        return wrapper
    return decorator

# 视图缓存装饰器
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
from django.views.generic import ListView

@cache_page(60 * 15)  # 缓存15分钟
def product_list_view(request):
    """产品列表视图(已缓存)"""
    products = Product.objects.all()
    return render(request, 'products/list.html', {'products': products})

# 类视图缓存
@method_decorator(cache_page(60 * 15), name='dispatch')
class ProductListView(ListView):
    """产品列表类视图(已缓存)"""
    model = Product
    template_name = 'products/list.html'
    paginate_by = 20

# 模板片段缓存
"""
{% load cache %}
{% cache 500 sidebar request.user.username %}
    <!-- 昂贵的查询或处理 -->
    {% for item in expensive_list %}
        <div>{{ item.name }}</div>
    {% endfor %}
{% endcache %}
"""

缓存失效策略

# 缓存失效策略
class CacheInvalidationStrategy:
    """缓存失效策略"""
    
    @staticmethod
    def time_based_invalidatoin():
        """基于时间的失效策略"""
        # 设置合理的TTL
        cache.set('user_profile', user_data, timeout=300)  # 5分钟
        
        # 不同数据类型使用不同TTL
        cache.set('hot_news', news_data, timeout=60)      # 1分钟(热点新闻)
        cache.set('static_config', config_data, timeout=3600)  # 1小时(配置)
        cache.set('historical_data', data, timeout=86400) # 24小时(历史数据)
    
    @staticmethod
    def event_based_invalidation():
        """基于事件的失效策略"""
        from django.db.models.signals import post_save, post_delete
        from django.dispatch import receiver
        
        @receiver(post_save, sender=User)
        def invalidate_user_cache(sender, instance, **kwargs):
            """用户保存时失效相关缓存"""
            # 失效用户相关缓存
            cache.delete(f'user_profile_{instance.id}')
            cache.delete(f'user_permissions_{instance.id}')
            
            # 如果是重要字段更新,失效更多缓存
            if kwargs.get('update_fields'):
                if 'username' in kwargs['update_fields']:
                    # 清除所有涉及用户名的缓存
                    cache.delete_many([
                        f'user_by_username_{instance.username}',
                        f'user_search_{instance.username}',
                    ])
    
    @staticmethod
    def cache_warming_strategy():
        """缓存预热策略"""
        from celery import shared_task
        
        @shared_task
        def warm_cache():
            """缓存预热任务"""
            # 预热热门数据
            popular_products = Product.objects.filter(
                views__gte=1000
            ).select_related('category').prefetch_related('tags')[:100]
            
            # 批量缓存
            cache_data = {}
            for product in popular_products:
                cache_data[f'product_detail_{product.id}'] = {
                    'id': product.id,
                    'name': product.name,
                    'category': product.category.name,
                    'tags': [tag.name for tag in product.tags.all()],
                }
            
            cache.set_many(cache_data, timeout=3600)
    
    @staticmethod
    def cache_dependency_management():
        """缓存依赖管理"""
        # 使用版本号管理缓存依赖
        def get_cache_version_key(dependency_type, dependency_id):
            return f'cache_version:{dependency_type}:{dependency_id}'
        
        def invalidate_dependency(dependency_type, dependency_id):
            """失效依赖"""
            version_key = get_cache_version_key(dependency_type, dependency_id)
            current_version = cache.get(version_key, 0)
            cache.set(version_key, current_version + 1, timeout=None)
        
        def get_versioned_cache_key(base_key, dependency_type, dependency_id):
            """获取版本化缓存键"""
            version_key = get_cache_version_key(dependency_type, dependency_id)
            version = cache.get(version_key, 0)
            return f'{base_key}:v{version}'
        
        # 使用示例
        def get_user_products_with_cache(user_id):
            cache_key = get_versioned_cache_key(
                f'user_products_{user_id}', 
                'user', 
                user_id
            )
            
            products = cache.get(cache_key)
            if products is None:
                products = Product.objects.filter(owner_id=user_id)
                cache.set(cache_key, products, timeout=300)
            
            return products

# Redis高级缓存模式
"""
# 使用Redis实现分布式锁
import redis
import time
import uuid

class DistributedLock:
    def __init__(self, redis_client, lock_name, acquire_timeout=10, lock_timeout=10):
        self.redis = redis_client
        self.lock_name = f'lock:{lock_name}'
        self.acquire_timeout = acquire_timeout
        self.lock_timeout = lock_timeout
        self.identifier = str(uuid.uuid4())
    
    def __enter__(self):
        acquired = self.acquire()
        if not acquired:
            raise TimeoutError("Could not acquire lock")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
    
    def acquire(self):
        end = time.time() + self.acquire_timeout
        while time.time() < end:
            if self.redis.set(self.lock_name, self.identifier, nx=True, ex=self.lock_timeout):
                return True
            time.sleep(0.001)
        return False
    
    def release(self):
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        self.redis.eval(lua_script, 1, self.lock_name, self.identifier)

# 使用分布式锁
redis_client = redis.Redis(host='localhost', port=6379, db=0)

def critical_section():
    with DistributedLock(redis_client, 'critical_operation', acquire_timeout=5):
        # 执行关键操作
        pass
"""

模板性能优化

模板渲染优化

# 模板渲染优化
from django.template import Context, Template
from django.template.loader import get_template, render_to_string
from django.utils.safestring import mark_safe
import time

class TemplateOptimizer:
    """模板优化器"""
    
    @staticmethod
    def minimize_template_logic():
        """最小化模板逻辑"""
        # ❌ 避免在模板中进行复杂计算
        """
        <!-- templates/bad_example.html -->
        {% for item in items %}
            {% if item.price > 100 and item.quantity > 10 and item.category == 'electronics' %}
                <div class="expensive-item">{{ item.name }}</div>
            {% endif %}
        {% endfor %}
        """
        
        # ✅ 在视图中预处理数据
        """
        # views.py
        def product_list_view(request):
            expensive_items = Item.objects.filter(
                price__gt=100,
                quantity__gt=10,
                category='electronics'
            )
            
            context = {
                'expensive_items': expensive_items,
                'regular_items': Item.objects.exclude(
                    pk__in=expensive_items.values_list('pk', flat=True)
                )
            }
            return render(request, 'products/list.html', context)
        """
    
    @staticmethod
    def optimize_template_inheritance():
        """优化模板继承"""
        # 使用block.super提高效率
        """
        <!-- base.html -->
        <!DOCTYPE html>
        <html>
        <head>
            {% block head %}
                <title>{% block title %}Default Title{% endblock %}</title>
            {% endblock %}
        </head>
        <body>
            {% block content %}{% endblock %}
        </body>
        </html>
        
        <!-- child.html -->
        {% extends "base.html" %}
        
        {% block title %}{{ block.super }} - Child Page{% endblock %}
        
        {% block content %}
            <!-- 具体内容 -->
        {% endblock %}
        """
    
    @staticmethod
    def use_template_caching():
        """使用模板缓存"""
        from django.core.cache import cache
        
        def render_cached_template(template_name, context, cache_key, timeout=300):
            """渲染缓存模板"""
            cached_result = cache.get(cache_key)
            if cached_result:
                return cached_result
            
            # 渲染模板
            result = render_to_string(template_name, context)
            
            # 缓存结果
            cache.set(cache_key, result, timeout)
            return result
    
    @staticmethod
    def optimize_loop_performance():
        """优化循环性能"""
        # 预计算循环中的复杂表达式
        """
        <!-- 在视图中预处理 -->
        def dashboard_view(request):
            # 预计算用户权限
            user_permissions = {
                user.id: user.get_all_permissions() 
                for user in User.objects.all()
            }
            
            # 预计算统计数据
            user_stats = {
                user.id: {
                    'post_count': user.posts.count(),
                    'comment_count': user.comments.count(),
                    'last_login_ago': time_since(user.last_login)
                }
                for user in User.objects.all()
            }
            
            context = {
                'users': User.objects.all(),
                'user_permissions': user_permissions,
                'user_stats': user_stats,
            }
            return render(request, 'dashboard.html', context)
        """

# 自定义模板标签优化
from django import template

register = template.Library()

@register.simple_tag(takes_context=True)
def user_permission_check(context, permission):
    """用户权限检查标签(已缓存)"""
    user = context['user']
    cache_key = f"user_perm_{user.id}_{permission}"
    
    has_perm = cache.get(cache_key)
    if has_perm is None:
        has_perm = user.has_perm(permission)
        cache.set(cache_key, has_perm, 300)  # 缓存5分钟
    
    return has_perm

@register.inclusion_tag('tags/user_badge.html', takes_context=True)
def user_badge(context, user):
    """用户徽章标签"""
    request = context['request']
    is_owner = request.user == user
    user_level = 'vip' if user.profile.is_vip else 'regular'
    
    return {
        'user': user,
        'is_owner': is_owner,
        'user_level': user_level,
        'request': request,
    }

# 模板碎片缓存
@register.simple_tag
def cached_user_widget(user_id):
    """缓存的用户组件"""
    cache_key = f"user_widget_{user_id}"
    widget_html = cache.get(cache_key)
    
    if widget_html is None:
        user = User.objects.get(id=user_id)
        widget_html = render_to_string('widgets/user_widget.html', {'user': user})
        cache.set(cache_key, widget_html, 600)  # 缓存10分钟
    
    return mark_safe(widget_html)

静态文件优化

# 静态文件优化
"""
# settings.py 静态文件配置
STATIC_URL = '/static/'
STATIC_ROOT = os.path.join(BASE_DIR, 'staticfiles')

STATICFILES_DIRS = [
    os.path.join(BASE_DIR, 'static'),
]

# 静态文件存储后端
STATICFILES_STORAGE = 'django.contrib.staticfiles.storage.ManifestStaticFilesStorage'

# 使用CDN
STATIC_URL = 'https://cdn.yourdomain.com/static/'
"""

# 静态文件版本控制
class StaticFileVersioning:
    """静态文件版本控制"""
    
    @staticmethod
    def add_cache_busting():
        """添加缓存破坏"""
        # 在模板中使用版本参数
        """
        <!-- 在base.html中 -->
        {% load static %}
        <link rel="stylesheet" type="text/css" href="{% static 'css/style.css' %}?v={% now 'U' %}">
        <script src="{% static 'js/app.js' %}?v={% now 'U' %}"></script>
        """
    
    @staticmethod
    def use_manifest_static_files():
        """使用清单静态文件存储"""
        """
        # settings.py
        STATICFILES_STORAGE = 'django.contrib.staticfiles.storage.ManifestStaticFilesStorage'
        
        # 这会自动为静态文件添加哈希后缀
        # style.css -> style.a1b2c3d4.css
        """
    
    @staticmethod
    def compress_static_files():
        """压缩静态文件"""
        """
        # 安装 django-compressor
        pip install django-compressor
        
        # settings.py
        INSTALLED_APPS = [
            # ...
            'compressor',
        ]
        
        STATICFILES_FINDERS = [
            'django.contrib.staticfiles.finders.FileSystemFinder',
            'django.contrib.staticfiles.finders.AppDirectoriesFinder',
            'compressor.finders.CompressorFinder',
        ]
        
        COMPRESS_ENABLED = True
        COMPRESS_OFFLINE = True
        
        # 在模板中使用
        {% load compress %}
        
        {% compress css %}
            <link rel="stylesheet" type="text/css" href="{% static 'css/style1.css' %}">
            <link rel="stylesheet" type="text/css" href="{% static 'css/style2.css' %}">
        {% endcompress %}
        
        {% compress js %}
            <script src="{% static 'js/script1.js' %}"></script>
            <script src="{% static 'js/script2.js' %}"></script>
        {% endcompress %}
        """

# Webpack集成优化
"""
# webpack.config.js 示例
const path = require('path');
const MiniCssExtractPlugin = require('mini-css-extract-plugin');

module.exports = {
    entry: './assets/js/main.js',
    output: {
        path: path.resolve(__dirname, 'static/dist'),
        filename: 'js/[name].[contenthash].js',
        clean: true,
    },
    module: {
        rules: [
            {
                test: /\.css$/,
                use: [MiniCssExtractPlugin.loader, 'css-loader'],
            },
            {
                test: /\.js$/,
                exclude: /node_modules/,
                use: 'babel-loader',
            },
        ],
    },
    plugins: [
        new MiniCssExtractPlugin({
            filename: 'css/[name].[contenthash].css',
        }),
    ],
};
"""

异步处理优化

Django异步支持

# Django异步处理
import asyncio
import aiohttp
from django.http import JsonResponse
from django.views import View
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
import json

class AsyncViewOptimization:
    """异步视图优化"""
    
    @staticmethod
    def async_view_example():
        """异步视图示例"""
        # Django 3.1+ 支持原生异步视图
        """
        # views.py
        import asyncio
        from django.http import JsonResponse
        
        async def async_api_view(request):
            # 模拟异步操作
            await asyncio.sleep(1)  # 实际场景中可能是数据库查询、API调用等
            
            # 异步获取多个数据源
            user_task = fetch_user_data()
            post_task = fetch_post_data()
            
            user_data, post_data = await asyncio.gather(user_task, post_task)
            
            return JsonResponse({
                'user': user_data,
                'posts': post_data
            })
        
        async def fetch_user_data():
            # 异步获取用户数据
            await asyncio.sleep(0.1)
            return {'id': 1, 'name': 'John'}
        
        async def fetch_post_data():
            # 异步获取帖子数据
            await asyncio.sleep(0.2)
            return [{'id': 1, 'title': 'Post 1'}, {'id': 2, 'title': 'Post 2'}]
        """
    
    @staticmethod
    def async_class_view():
        """异步类视图"""
        """
        from django.views import View
        
        class AsyncDataView(View):
            async def get(self, request):
                # 异步处理
                data = await self.fetch_async_data()
                return JsonResponse(data)
            
            async def fetch_async_data(self):
                # 异步数据获取
                import asyncpg
                
                conn = await asyncpg.connect(
                    host='localhost',
                    port=5432,
                    user='user',
                    password='password',
                    database='db'
                )
                
                result = await conn.fetch('SELECT * FROM users LIMIT 10')
                await conn.close()
                
                return [dict(row) for row in result]
        """
    
    @staticmethod
    def async_database_operations():
        """异步数据库操作"""
        """
        # 需要使用支持异步的数据库驱动
        # 如 asyncpg for PostgreSQL
        
        import databases
        import sqlalchemy
        
        DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"
        database = databases.Database(DATABASE_URL)
        metadata = sqlalchemy.MetaData()
        
        async def startup():
            await database.connect()
        
        async def shutdown():
            await database.disconnect()
        
        async def get_users():
            query = "SELECT * FROM users"
            return await database.fetch_all(query)
        """

# Celery异步任务
"""
# Celery配置
# celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# tasks.py
from celery import shared_task
import time

@shared_task
def send_email_task(recipient, subject, body):
    # 模拟发送邮件
    time.sleep(5)  # 实际发送邮件的操作
    return f"Email sent to {recipient}"

@shared_task
def process_large_dataset(dataset_id):
    # 处理大数据集
    dataset = Dataset.objects.get(id=dataset_id)
    # 执行复杂的数据处理
    result = complex_data_processing(dataset.data)
    dataset.result = result
    dataset.save()
    return "Processing completed"
"""

# 异步信号处理器
"""
from asgiref.sync import sync_to_async
from django.db.models.signals import post_save
from django.dispatch import receiver

@receiver(post_save, sender=User)
def async_user_created_handler(sender, instance, created, **kwargs):
    if created:
        # 使用sync_to_async包装同步任务
        async_task = sync_to_async(send_welcome_email)(instance.email)
        # 或者使用Celery任务
        send_welcome_email.delay(instance.email)
"""

WebSocket实时优化

# WebSocket优化
"""
# Django Channels 配置
# routing.py
from django.urls import path
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import myapp.consumers

application = ProtocolTypeRouter({
    "websocket": AuthMiddlewareStack(
        URLRouter([
            path("ws/chat/<str:room_name>/", myapp.consumers.ChatConsumer.as_asgi()),
        ])
    ),
})

# consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'
        
        # 加入房间组
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        
        await self.accept()
    
    async def disconnect(self, close_code):
        # 离开房间组
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )
    
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']
        
        # 发送消息到房间组
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message
            }
        )
    
    async def chat_message(self, event):
        message = event['message']
        
        # 发送消息到WebSocket
        await self.send(text_data=json.dumps({
            'message': message
        }))
"""

部署优化

生产环境配置

# 生产环境优化配置
"""
# production.py
from .base import *

DEBUG = False
ALLOWED_HOSTS = ['yourdomain.com', 'www.yourdomain.com']

# 数据库优化
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': os.environ.get('DB_NAME'),
        'USER': os.environ.get('DB_USER'),
        'PASSWORD': os.environ.get('DB_PASSWORD'),
        'HOST': os.environ.get('DB_HOST', 'localhost'),
        'PORT': os.environ.get('DB_PORT', '5432'),
        'CONN_MAX_AGE': 60,
        'OPTIONS': {
            'MAX_CONNS': 20,
            'MIN_CONNS': 5,
        },
    }
}

# 缓存配置
CACHES = {
    'default': {
        'BACKEND': 'django_redis.cache.RedisCache',
        'LOCATION': 'redis://127.0.0.1:6379/1',
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
            'CONNECTION_POOL_KWARGS': {
                'max_connections': 20,
                'retry_on_timeout': True,
            }
        },
        'KEY_PREFIX': 'myapp',
        'TIMEOUT': 300,
    }
}

# 静态文件配置
STATIC_URL = '/static/'
STATIC_ROOT = '/var/www/myapp/static/'
MEDIA_URL = '/media/'
MEDIA_ROOT = '/var/www/myapp/media/'

# 安全设置
SECURE_SSL_REDIRECT = True
SECURE_HSTS_SECONDS = 31536000
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
SECURE_CONTENT_TYPE_NOSNIFF = True
SECURE_BROWSER_XSS_FILTER = True
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True

# 性能优化
USE_TZ = True
TIME_ZONE = 'UTC'
LANGUAGE_CODE = 'en-us'

# 压缩
COMPRESS_ENABLED = True
COMPRESS_OFFLINE = True
"""

# Gunicorn配置
"""
# gunicorn.conf.py
bind = "127.0.0.1:8000"
workers = 4  # 根据CPU核心数调整
worker_class = "sync"  # 或 "gevent", "eventlet" 用于异步
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
timeout = 30
keepalive = 5
preload_app = True
"""

Docker优化

# Dockerfile优化示例
"""
FROM python:3.11-slim

# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV DJANGO_SETTINGS_MODULE=myproject.settings.production

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# 创建应用目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 创建非root用户
RUN useradd --create-home --shell /bin/bash app \
    && chown -R app:app /app
USER app

# 收集静态文件
RUN python manage.py collectstatic --noinput

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["gunicorn", "--config", "gunicorn.conf.py", "myproject.wsgi:application"]
"""

性能监控与分析

性能监控工具

# 性能监控实现
import time
import psutil
import logging
from django.db import connection
from django.core.handlers.wsgi import WSGIRequest
from functools import wraps

class PerformanceMonitor:
    """性能监控器"""
    
    def __init__(self):
        self.logger = logging.getLogger('performance')
    
    def monitor_view_performance(self, view_func):
        """监控视图性能的装饰器"""
        @wraps(view_func)
        def wrapper(request, *args, **kwargs):
            start_time = time.time()
            start_queries = len(connection.queries)
            start_memory = psutil.Process().memory_info().rss
            
            try:
                response = view_func(request, *args, **kwargs)
            except Exception as e:
                self.logger.error(f"View error: {view_func.__name__}, Error: {str(e)}")
                raise
            
            end_time = time.time()
            end_queries = len(connection.queries)
            end_memory = psutil.Process().memory_info().rss
            
            # 计算性能指标
            duration = end_time - start_time
            query_count = end_queries - start_queries
            memory_delta = end_memory - start_memory
            
            # 记录性能数据
            self.logger.info(
                f"View: {view_func.__name__}, "
                f"Duration: {duration:.3f}s, "
                f"Queries: {query_count}, "
                f"Memory Delta: {memory_delta / 1024 / 1024:.2f}MB, "
                f"Method: {request.method}, "
                f"Path: {request.path}"
            )
            
            # 性能告警
            if duration > 2.0:  # 超过2秒的请求
                self.logger.warning(f"SLOW REQUEST: {request.path}, Duration: {duration:.3f}s")
            
            if query_count > 50:  # 超过50个查询
                self.logger.warning(f"HIGH QUERY COUNT: {request.path}, Queries: {query_count}")
            
            return response
        
        return wrapper

# 使用性能监控装饰器
monitor = PerformanceMonitor()

@monitor.monitor_view_performance
def expensive_view(request):
    """耗时的视图函数"""
    # 模拟耗时操作
    time.sleep(1)
    
    # 模拟多个数据库查询
    users = User.objects.all()[:100]
    
    return JsonResponse({'users': [user.username for user in users]})

# 中间件形式的性能监控
class PerformanceMonitoringMiddleware:
    """性能监控中间件"""
    
    def __init__(self, get_response):
        self.get_response = get_response
        self.monitor = PerformanceMonitor()

    def __call__(self, request):
        start_time = time.time()
        start_queries = len(connection.queries)
        
        response = self.get_response(request)
        
        end_time = time.time()
        end_queries = len(connection.queries)
        
        duration = end_time - start_time
        query_count = end_queries - start_queries
        
        # 记录性能数据
        logging.info(
            f"Request: {request.method} {request.path}, "
            f"Duration: {duration:.3f}s, "
            f"Queries: {query_count}, "
            f"Status: {response.status_code}"
        )
        
        return response

# 性能分析工具集成
"""
# 安装 django-silk 进行性能分析
pip install django-silk

# settings.py
INSTALLED_APPS = [
    # ...
    'silk',
]

MIDDLEWARE = [
    # ...
    'silk.middleware.SilkyMiddleware',
    # ...
]

# urls.py
if settings.DEBUG:
    urlpatterns += [path('silk/', include('silk.urls', namespace='silk'))]

# 安装 django-debug-toolbar
pip install django-debug-toolbar

# settings.py
INSTALLED_APPS = [
    # ...
    'debug_toolbar',
]

MIDDLEWARE = [
    # ...
    'debug_toolbar.middleware.DebugToolbarMiddleware',
]

INTERNAL_IPS = ['127.0.0.1']
"""

APM监控

# APM监控集成示例
"""
# 使用 New Relic
pip install newrelic

# newrelic.ini
[newrelic]
license_key = your_license_key
app_name = My Django App
monitor_mode = true

# 启动应用时使用
NEW_RELIC_CONFIG_FILE=newrelic.ini newrelic-admin run-program python manage.py runserver

# 使用 Datadog
pip install datadog

# 在 settings.py 中配置
DATADOG_API_KEY = 'your_datadog_api_key'

# 使用 Sentry 进行错误监控
pip install sentry-sdk

import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration

sentry_sdk.init(
    dsn="your_sentry_dsn",
    integrations=[DjangoIntegration()],
    traces_sample_rate=1.0,
    send_default_pii=True
)
"""

本章小结

在本章中,我们深入学习了Django性能优化:

  1. 性能优化概述:了解了性能指标和优化原则
  2. 数据库查询优化:掌握了ORM优化、索引优化和连接池管理
  3. 缓存策略优化:学习了多层级缓存和失效策略
  4. 模板性能优化:掌握了模板渲染和静态文件优化
  5. 异步处理优化:了解了异步视图和任务队列
  6. 部署优化:学习了生产环境配置和容器化
  7. 性能监控与分析:掌握了监控工具和APM集成

核心要点回顾

"""
本章核心要点:

1. 性能优化需要基于测量数据,不能盲目优化
2. 数据库查询优化是最重要的优化方向
3. 合理的缓存策略能显著提升性能
4. 异步处理适合I/O密集型任务
5. 生产环境配置对性能影响巨大
6. 持续监控是保持高性能的关键
7. 性能优化是一个持续的过程
8. 避免过早优化,关注主要瓶颈
"""

下一步学习

现在您已经掌握了Django性能优化,可以继续学习:

💡 核心要点:性能优化需要平衡多个因素,包括响应时间、吞吐量、资源利用率和可维护性。建立完善的监控体系,持续观察和优化,是保持应用高性能的关键。

SEO优化策略

  1. 关键词布局: 在标题、内容中合理布局"Django性能优化", "数据库优化", "缓存策略", "异步处理", "性能调优", "Web性能"等关键词
  2. 内容结构: 使用清晰的标题层级(H1-H3),便于搜索引擎理解内容结构
  3. 内部链接: 建立与其他相关教程的内部链接,提升页面权重
  4. 代码示例: 提供丰富的实际代码示例,增加页面价值
  5. 元数据优化: 在页面头部包含描述性的标题、描述和标签

🔗 相关教程推荐

🏷️ 标签云: Django性能优化 数据库优化 缓存策略 异步处理 性能调优 Web性能 查询优化 APM监控