Django用户认证系统 - 安全登录注册与权限管理

📂 所属阶段:第三部分 — 高级主题
🎯 难度等级:高级
⏰ 预计学习时间:6-8小时
🎒 前置知识:会话管理

目录

认证系统基础概念

用户认证系统是Web应用的核心组件,负责验证用户身份并管理用户权限。

认证与授权

"""
认证(Authentication) vs 授权(Authorization):

认证(Authentication):
- 验证用户身份
- 证明"你是谁"
- 如:用户名密码验证
- Django内置认证系统

授权(Authorization):
- 验证用户权限
- 证明"你能做什么"
- 如:角色权限检查
- Django权限系统

认证流程:
1. 用户提供凭据(用户名/密码)
2. 系统验证凭据有效性
3. 创建用户会话
4. 标记用户为已认证

授权流程:
1. 检查用户认证状态
2. 验证用户权限
3. 允许或拒绝访问
4. 记录访问日志
"""

Django认证系统组成

"""
Django认证系统组成部分:

1. User模型 (django.contrib.auth.models.User)
   - 用户基本信息存储
   - 密码哈希存储
   - 权限字段

2. Authentication Backend (django.contrib.auth.backends)
   - 用户验证逻辑
   - 支持多种认证方式

3. Permission System
   - 用户权限管理
   - 内容级权限控制

4. Group System
   - 用户分组管理
   - 批量权限分配

5. Login/Logout Views
   - 内置登录/登出视图
   - 会话管理

6. Decorators & Mixins
   - 视图保护装饰器
   - 类视图权限检查
"""

认证核心组件

# Django认证核心组件
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.models import User, Group, Permission
from django.contrib.auth.backends import ModelBackend
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin

"""
主要功能模块:

1. 用户管理
   - 创建、修改、删除用户
   - 密码管理
   - 激活/禁用用户

2. 权限管理
   - 模型级权限
   - 对象级权限
   - 自定义权限

3. 组管理
   - 用户分组
   - 权限批量分配
   - 角色管理

4. 会话管理
   - 登录/登出
   - 会话安全
   - 多设备登录
"""

Django认证架构

认证中间件

# 认证中间件配置
from django.contrib.auth.middleware import AuthenticationMiddleware
from django.contrib.sessions.middleware import SessionMiddleware

class CustomAuthenticationMiddleware(AuthenticationMiddleware):
    """自定义认证中间件"""
    
    def process_request(self, request):
        """处理请求 - 认证用户"""
        super().process_request(request)
        
        # 额外的认证逻辑
        if hasattr(request, 'user') and request.user.is_authenticated:
            # 更新最后活动时间
            self.update_last_activity(request.user)
            
            # 检查账户状态
            if not self.is_account_valid(request.user):
                # 账户无效,强制登出
                logout(request)
    
    def update_last_activity(self, user):
        """更新用户最后活动时间"""
        from django.utils import timezone
        user.last_login = timezone.now()
        user.save(update_fields=['last_login'])
    
    def is_account_valid(self, user):
        """检查账户是否有效"""
        # 检查账户是否被禁用
        if not user.is_active:
            return False
        
        # 检查账户是否过期
        if hasattr(user, 'expiry_date') and user.expiry_date:
            from django.utils import timezone
            if user.expiry_date < timezone.now():
                return False
        
        return True

# 认证后端接口
from django.contrib.auth.backends import BaseBackend

class CustomAuthBackend(BaseBackend):
    """自定义认证后端"""
    
    def authenticate(self, request, username=None, password=None, **kwargs):
        """认证用户"""
        if username is None or password is None:
            return None
        
        try:
            # 自定义查询逻辑
            user = self.get_user_by_username(username)
            
            if user and self.check_password(password, user.password):
                return user
        except Exception:
            # 记录认证失败日志
            import logging
            logger = logging.getLogger('auth.failed')
            logger.warning(f"Authentication failed for username: {username}")
        
        return None
    
    def get_user_by_username(self, username):
        """根据用户名获取用户"""
        try:
            from django.contrib.auth.models import User
            return User.objects.get(username=username)
        except User.DoesNotExist:
            return None
    
    def check_password(self, raw_password, hashed_password):
        """检查密码"""
        from django.contrib.auth.hashers import check_password
        return check_password(raw_password, hashed_password)
    
    def get_user(self, user_id):
        """获取用户"""
        try:
            from django.contrib.auth.models import User
            return User.objects.get(pk=user_id)
        except User.DoesNotExist:
            return None

# 认证处理器
class AuthHandler:
    """认证处理器"""
    
    def __init__(self):
        self.backends = self.get_backends()
    
    def get_backends(self):
        """获取认证后端"""
        from django.conf import settings
        from django.utils.module_loading import import_string
        
        backends = []
        for backend_path in settings.AUTHENTICATION_BACKENDS:
            backend_cls = import_string(backend_path)
            backends.append(backend_cls())
        
        return backends
    
    def authenticate_user(self, **credentials):
        """认证用户"""
        for backend in self.backends:
            user = backend.authenticate(None, **credentials)
            if user:
                # 设置后端信息
                user.backend = f"{backend.__class__.__module__}.{backend.__class__.__name__}"
                return user
        return None

# 使用示例
def custom_authenticate(username, password):
    """自定义认证函数"""
    handler = AuthHandler()
    return handler.authenticate_user(username=username, password=password)

用户会话管理

# 用户会话管理
from django.contrib.auth import login, logout
from django.contrib.sessions.models import Session
from django.contrib.auth.models import User
from django.utils import timezone

class UserSessionManager:
    """用户会话管理器"""
    
    @staticmethod
    def create_user_session(request, user, remember_me=False):
        """创建用户会话"""
        # 登录用户
        login(request, user)
        
        # 设置会话过期时间
        if not remember_me:
            request.session.set_expiry(0)  # 浏览器关闭时过期
        else:
            request.session.set_expiry(1209600)  # 2周后过期
        
        # 记录登录信息
        UserSessionManager.record_login_info(user, request)
    
    @staticmethod
    def record_login_info(user, request):
        """记录登录信息"""
        # 更新用户最后登录时间
        user.last_login = timezone.now()
        user.save(update_fields=['last_login'])
        
        # 记录登录日志
        login_log = UserLoginLog.objects.create(
            user=user,
            ip_address=request.META.get('REMOTE_ADDR'),
            user_agent=request.META.get('HTTP_USER_AGENT', ''),
            login_time=timezone.now()
        )
        
        # 清除之前的失败登录记录
        UserFailedLoginAttempt.objects.filter(
            user=user,
            attempt_time__lt=timezone.now() - timezone.timedelta(hours=1)
        ).delete()
    
    @staticmethod
    def logout_user(request):
        """登出用户"""
        if request.user.is_authenticated:
            # 记录登出信息
            UserSessionManager.record_logout_info(request.user, request)
        
        # 执行登出
        logout(request)
    
    @staticmethod
    def record_logout_info(user, request):
        """记录登出信息"""
        UserLogoutLog.objects.create(
            user=user,
            ip_address=request.META.get('REMOTE_ADDR'),
            logout_time=timezone.now()
        )

# 登录日志模型
from django.db import models

class UserLoginLog(models.Model):
    """用户登录日志"""
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    ip_address = models.GenericIPAddressField()
    user_agent = models.TextField(blank=True)
    login_time = models.DateTimeField(auto_now_add=True)
    success = models.BooleanField(default=True)
    
    class Meta:
        db_table = 'user_login_log'
        ordering = ['-login_time']

class UserLogoutLog(models.Model):
    """用户登出日志"""
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    ip_address = models.GenericIPAddressField()
    logout_time = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        db_table = 'user_logout_log'
        ordering = ['-logout_time']

class UserFailedLoginAttempt(models.Model):
    """用户失败登录尝试"""
    username = models.CharField(max_length=150)
    ip_address = models.GenericIPAddressField()
    attempt_time = models.DateTimeField(auto_now_add=True)
    user_agent = models.TextField(blank=True)
    
    class Meta:
        db_table = 'user_failed_login_attempts'
        ordering = ['-attempt_time']

# 会话安全检查
class SessionSecurityChecker:
    """会话安全检查器"""
    
    @staticmethod
    def check_brute_force_attack(username, ip_address, max_attempts=5):
        """检查暴力破解攻击"""
        from django.utils import timezone
        from django.conf import settings
        
        # 检查相同IP的失败尝试
        recent_attempts = UserFailedLoginAttempt.objects.filter(
            ip_address=ip_address,
            attempt_time__gte=timezone.now() - timezone.timedelta(
                seconds=getattr(settings, 'BRUTE_FORCE_TIME_WINDOW', 900)  # 15分钟
            )
        ).count()
        
        if recent_attempts >= getattr(settings, 'MAX_LOGIN_ATTEMPTS_PER_IP', 10):
            return True, "IP地址被临时封禁"
        
        # 检查特定用户的失败尝试
        user_attempts = UserFailedLoginAttempt.objects.filter(
            username=username,
            attempt_time__gte=timezone.now() - timezone.timedelta(
                seconds=getattr(settings, 'BRUTE_FORCE_TIME_WINDOW', 900)
            )
        ).count()
        
        if user_attempts >= max_attempts:
            return True, "账户被临时锁定"
        
        return False, ""
    
    @staticmethod
    def log_failed_attempt(username, ip_address, user_agent):
        """记录失败登录尝试"""
        UserFailedLoginAttempt.objects.create(
            username=username,
            ip_address=ip_address,
            user_agent=user_agent
        )

# 使用示例
def secure_login_view(request):
    """安全登录视图"""
    if request.method == 'POST':
        username = request.POST.get('username')
        password = request.POST.get('password')
        remember_me = request.POST.get('remember_me', False)
        
        # 检查暴力破解
        ip_address = request.META.get('REMOTE_ADDR')
        user_agent = request.META.get('HTTP_USER_AGENT', '')
        
        is_attack, attack_msg = SessionSecurityChecker.check_brute_force_attack(
            username, ip_address
        )
        
        if is_attack:
            return {'error': attack_msg, 'blocked': True}
        
        # 尝试认证
        user = authenticate(request, username=username, password=password)
        if user:
            if user.is_active:
                # 创建安全会话
                UserSessionManager.create_user_session(
                    request, user, remember_me
                )
                return {'success': True, 'redirect': '/dashboard/'}
            else:
                return {'error': '账户已被禁用'}
        else:
            # 记录失败尝试
            SessionSecurityChecker.log_failed_attempt(
                username, ip_address, user_agent
            )
            return {'error': '用户名或密码错误'}
    
    return {'success': False}

认证装饰器和混入

# 认证装饰器
from functools import wraps
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin
from django.http import HttpResponseForbidden, HttpResponseRedirect
from django.urls import reverse
from django.contrib import messages

def custom_login_required(function=None, redirect_field_name='next', login_url=None):
    """自定义登录要求装饰器"""
    def decorator(view_func):
        @wraps(view_func)
        def _wrapped_view(request, *args, **kwargs):
            if not request.user.is_authenticated:
                # 检查是否为AJAX请求
                if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
                    return HttpResponseForbidden("Authentication required")
                
                # 重定向到登录页面
                login_url = login_url or reverse('login')
                next_url = request.get_full_path()
                redirect_url = f"{login_url}?{redirect_field_name}={next_url}"
                return HttpResponseRedirect(redirect_url)
            
            # 检查用户账户状态
            if hasattr(request.user, 'is_account_active') and not request.user.is_account_active():
                messages.error(request, '您的账户已被禁用')
                logout(request)
                return HttpResponseRedirect(reverse('login'))
            
            return view_func(request, *args, **kwargs)
        return _wrapped_view
    
    if function:
        return decorator(function)
    return decorator

def role_required(roles, login_url=None, redirect_field_name='next'):
    """角色要求装饰器"""
    def decorator(view_func):
        @wraps(view_func)
        def _wrapped_view(request, *args, **kwargs):
            if not request.user.is_authenticated:
                login_url = login_url or reverse('login')
                next_url = request.get_full_path()
                redirect_url = f"{login_url}?{redirect_field_name}={next_url}"
                return HttpResponseRedirect(redirect_url)
            
            # 检查用户角色
            user_roles = set(request.user.groups.values_list('name', flat=True))
            required_roles = set(roles) if isinstance(roles, (list, tuple)) else {roles}
            
            if not (user_roles & required_roles):
                messages.error(request, '您没有权限访问此页面')
                return HttpResponseForbidden("Access denied")
            
            return view_func(request, *args, **kwargs)
        return _wrapped_view
    return decorator

def multi_permission_required(permissions, logical='AND', login_url=None, redirect_field_name='next'):
    """多权限要求装饰器"""
    def decorator(view_func):
        @wraps(view_func)
        def _wrapped_view(request, *args, **kwargs):
            if not request.user.is_authenticated:
                login_url = login_url or reverse('login')
                next_url = request.get_full_path()
                redirect_url = f"{login_url}?{redirect_field_name}={next_url}"
                return HttpResponseRedirect(redirect_url)
            
            # 检查权限
            has_permissions = []
            for perm in permissions:
                if '.' in perm:
                    app_label, codename = perm.split('.', 1)
                    has_permissions.append(request.user.has_perm(perm))
                else:
                    # 直接检查权限代码
                    has_permissions.append(request.user.has_perm(perm))
            
            # 根据逻辑操作符检查权限
            if logical.upper() == 'AND':
                has_access = all(has_permissions)
            elif logical.upper() == 'OR':
                has_access = any(has_permissions)
            else:
                has_access = all(has_permissions)  # 默认AND
            
            if not has_access:
                messages.error(request, '您没有足够的权限访问此页面')
                return HttpResponseForbidden("Access denied")
            
            return view_func(request, *args, **kwargs)
        return _wrapped_view
    return decorator

# 自定义混入类
from django.contrib.auth.mixins import AccessMixin

class RoleRequiredMixin(AccessMixin):
    """角色要求混入类"""
    role_required = None
    raise_exception = False
    permission_denied_message = '您没有权限访问此页面'
    
    def dispatch(self, request, *args, **kwargs):
        if not request.user.is_authenticated:
            return self.handle_no_permission()
        
        if not self.has_role():
            return self.handle_no_permission()
        
        return super().dispatch(request, *args, **kwargs)
    
    def has_role(self):
        """检查用户角色"""
        if self.role_required is None:
            return True
        
        user_roles = set(self.request.user.groups.values_list('name', flat=True))
        required_roles = set(self.role_required) if isinstance(self.role_required, (list, tuple)) else {self.role_required}
        
        return bool(user_roles & required_roles)
    
    def handle_no_permission(self):
        """处理无权限情况"""
        if self.raise_exception:
            from django.core.exceptions import PermissionDenied
            raise PermissionDenied(self.permission_denied_message)
        
        messages.error(self.request, self.permission_denied_message)
        return HttpResponseForbidden(self.permission_denied_message)

class MultiPermissionRequiredMixin(AccessMixin):
    """多权限要求混入类"""
    permissions_required = None
    permissions_logical = 'AND'  # 'AND' or 'OR'
    raise_exception = False
    permission_denied_message = '您没有足够的权限访问此页面'
    
    def dispatch(self, request, *args, **kwargs):
        if not request.user.is_authenticated:
            return self.handle_no_permission()
        
        if not self.has_permissions():
            return self.handle_no_permission()
        
        return super().dispatch(request, *args, **kwargs)
    
    def has_permissions(self):
        """检查用户权限"""
        if self.permissions_required is None:
            return True
        
        has_perms = []
        for perm in self.permissions_required:
            if '.' in perm:
                has_perms.append(self.request.user.has_perm(perm))
            else:
                has_perms.append(self.request.user.has_perm(perm))
        
        if self.permissions_logical.upper() == 'AND':
            return all(has_perms)
        elif self.permissions_logical.upper() == 'OR':
            return any(has_perms)
        else:
            return all(has_perms)
    
    def handle_no_permission(self):
        """处理无权限情况"""
        if self.raise_exception:
            from django.core.exceptions import PermissionDenied
            raise PermissionDenied(self.permission_denied_message)
        
        messages.error(self.request, self.permission_denied_message)
        return HttpResponseForbidden(self.permission_denied_message)

# 使用示例
@custom_login_required
@role_required(['admin', 'manager'])
@multi_permission_required(['app.view_report', 'app.edit_data'], logical='OR')
def admin_dashboard_view(request):
    """管理员仪表板视图"""
    return {'user': request.user, 'permissions': request.user.get_all_permissions()}

class AdminDashboardView(LoginRequiredMixin, RoleRequiredMixin, MultiPermissionRequiredMixin):
    """管理员仪表板类视图"""
    role_required = ['admin', 'manager']
    permissions_required = ['app.view_report', 'app.edit_data']
    permissions_logical = 'OR'
    
    def get(self, request):
        return {'user': request.user, 'permissions': request.user.get_all_permissions()}
}

## 自定义用户模型 \{#自定义用户模型}

### 扩展内置用户模型

```python
# 扩展内置用户模型
from django.contrib.auth.models import AbstractUser
from django.db import models
from django.utils import timezone

class CustomUser(AbstractUser):
    """自定义用户模型"""
    # 基础扩展字段
    phone_number = models.CharField(max_length=20, blank=True, null=True, unique=True)
    birth_date = models.DateField(blank=True, null=True)
    avatar = models.ImageField(upload_to='avatars/', blank=True, null=True)
    bio = models.TextField(max_length=500, blank=True)
    location = models.CharField(max_length=30, blank=True)
    website = models.URLField(blank=True)
    
    # 账户状态字段
    is_verified = models.BooleanField(default=False)
    is_premium = models.BooleanField(default=False)
    expiry_date = models.DateTimeField(blank=True, null=True)
    
    # 审计字段
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    
    # 自定义字段
    preferred_language = models.CharField(
        max_length=10, 
        choices=[
            ('en', 'English'),
            ('zh', '中文'),
            ('ja', '日本語'),
            ('ko', '한국어')
        ], 
        default='en'
    )
    timezone = models.CharField(max_length=50, default='UTC')
    
    # 索引优化
    class Meta:
        db_table = 'custom_user'
        indexes = [
            models.Index(fields=['phone_number']),
            models.Index(fields=['is_verified']),
            models.Index(fields=['is_premium']),
            models.Index(fields=['created_at']),
        ]
    
    def __str__(self):
        return f"{self.username} ({self.email})"
    
    def get_full_name(self):
        """获取完整姓名"""
        full_name = f"{self.first_name} {self.last_name}".strip()
        if not full_name:
            full_name = self.username
        return full_name
    
    def get_short_name(self):
        """获取简称"""
        return self.first_name or self.username
    
    def is_account_active(self):
        """检查账户是否活跃"""
        if not self.is_active:
            return False
        
        if self.expiry_date and self.expiry_date < timezone.now():
            return False
        
        return True
    
    def get_display_name(self):
        """获取显示名称"""
        if self.first_name and self.last_name:
            return f"{self.first_name} {self.last_name}"
        elif self.first_name:
            return self.first_name
        elif self.last_name:
            return self.last_name
        else:
            return self.username

# 用户配置文件模型
class UserProfile(models.Model):
    """用户配置文件"""
    user = models.OneToOneField(CustomUser, on_delete=models.CASCADE, related_name='profile')
    
    # 个人资料
    gender = models.CharField(
        max_length=10,
        choices=[
            ('male', '男'),
            ('female', '女'),
            ('other', '其他'),
            ('prefer_not_to_say', '不愿透露')
        ],
        blank=True
    )
    occupation = models.CharField(max_length=100, blank=True)
    company = models.CharField(max_length=100, blank=True)
    education = models.CharField(max_length=100, blank=True)
    
    # 社交信息
    wechat = models.CharField(max_length=50, blank=True)
    qq = models.CharField(max_length=20, blank=True)
    weibo = models.URLField(blank=True)
    
    # 通知设置
    email_notifications = models.BooleanField(default=True)
    sms_notifications = models.BooleanField(default=False)
    push_notifications = models.BooleanField(default=True)
    
    # 隐私设置
    profile_public = models.BooleanField(default=True)
    show_email = models.BooleanField(default=True)
    show_phone = models.BooleanField(default=False)
    
    # 统计信息
    login_count = models.PositiveIntegerField(default=0)
    post_count = models.PositiveIntegerField(default=0)
    follower_count = models.PositiveIntegerField(default=0)
    following_count = models.PositiveIntegerField(default=0)
    
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    
    class Meta:
        db_table = 'user_profile'
        verbose_name = '用户配置文件'
        verbose_name_plural = '用户配置文件'

# 信号处理器 - 自动创建用户配置文件
from django.db.models.signals import post_save
from django.dispatch import receiver

@receiver(post_save, sender=CustomUser)
def create_user_profile(sender, instance, created, **kwargs):
    """创建用户配置文件"""
    if created:
        UserProfile.objects.create(user=instance)

@receiver(post_save, sender=CustomUser)
def save_user_profile(sender, instance, **kwargs):
    """保存用户配置文件"""
    if hasattr(instance, 'profile'):
        instance.profile.save()

# 用户验证器
from django.core.exceptions import ValidationError
from django.contrib.auth.validators import UnicodeUsernameValidator
import re

class UserValidator:
    """用户验证器"""
    
    @staticmethod
    def validate_phone_number(phone_number):
        """验证手机号"""
        if phone_number:
            # 中国手机号验证
            pattern = r'^1[3-9]\d{9}$'
            if not re.match(pattern, phone_number):
                raise ValidationError('请输入有效的手机号码')
    
    @staticmethod
    def validate_username(username):
        """验证用户名"""
        if username:
            # 检查长度
            if len(username) < 3 or len(username) > 30:
                raise ValidationError('用户名长度应在3-30个字符之间')
            
            # 检查字符
            if not re.match(r'^[a-zA-Z0-9_]+$', username):
                raise ValidationError('用户名只能包含字母、数字和下划线')
    
    @staticmethod
    def validate_email(email):
        """验证邮箱"""
        if email:
            from django.core.validators import validate_email
            try:
                validate_email(email)
            except ValidationError:
                raise ValidationError('请输入有效的邮箱地址')

# 自定义用户管理器
from django.contrib.auth.models import BaseUserManager

class CustomUserManager(BaseUserManager):
    """自定义用户管理器"""
    
    def create_user(self, username, email=None, password=None, **extra_fields):
        """创建普通用户"""
        if not username:
            raise ValueError('用户名不能为空')
        
        email = self.normalize_email(email) if email else None
        user = self.model(username=username, email=email, **extra_fields)
        user.set_password(password)
        user.save(using=self._db)
        return user
    
    def create_superuser(self, username, email=None, password=None, **extra_fields):
        """创建超级用户"""
        extra_fields.setdefault('is_staff', True)
        extra_fields.setdefault('is_superuser', True)
        extra_fields.setdefault('is_verified', True)
        
        if extra_fields.get('is_staff') is not True:
            raise ValueError('超级用户必须设置is_staff=True')
        if extra_fields.get('is_superuser') is not True:
            raise ValueError('超级用户必须设置is_superuser=True')
        
        return self.create_user(username, email, password, **extra_fields)
    
    def get_by_phone(self, phone_number):
        """根据手机号获取用户"""
        return self.get(phone_number=phone_number)
    
    def get_active_users(self):
        """获取活跃用户"""
        return self.filter(
            is_active=True,
            expiry_date__isnull=True
        ).select_related('profile')
    
    def get_premium_users(self):
        """获取付费用户"""
        return self.filter(is_premium=True).select_related('profile')

# 更新模型使用自定义管理器
CustomUser.add_to_class('objects', CustomUserManager())

# 用户服务类
class UserService:
    """用户服务类"""
    
    @staticmethod
    def create_user_with_profile(username, email, password, **extra_fields):
        """创建用户并配置资料"""
        user = CustomUser.objects.create_user(
            username=username,
            email=email,
            password=password,
            **extra_fields
        )
        
        # 更新配置文件
        profile_data = extra_fields.get('profile_data', {})
        if profile_data:
            profile = user.profile
            for field, value in profile_data.items():
                if hasattr(profile, field):
                    setattr(profile, field, value)
            profile.save()
        
        return user
    
    @staticmethod
    def update_user_profile(user, **profile_data):
        """更新用户配置文件"""
        profile = user.profile
        for field, value in profile_data.items():
            if hasattr(profile, field):
                setattr(profile, field, value)
        profile.save()
        return profile
    
    @staticmethod
    def get_user_recommendations(user, limit=10):
        """获取用户推荐"""
        # 基于兴趣、地理位置等的推荐逻辑
        from django.db.models import Q
        
        recommendations = CustomUser.objects.filter(
            Q(profile__location=user.profile.location) &
            ~Q(id=user.id)  # 排除自己
        ).exclude(
            id__in=user.profile.following.all()  # 排除已关注的
        ).select_related('profile')[:limit]
        
        return recommendations

# 使用示例
def user_creation_example():
    """用户创建示例"""
    # 创建用户
    user = UserService.create_user_with_profile(
        username='john_doe',
        email='john@example.com',
        password='secure_password',
        first_name='John',
        last_name='Doe',
        phone_number='13812345678',
        profile_data={
            'gender': 'male',
            'occupation': 'Software Engineer',
            'company': 'Tech Corp'
        }
    )
    
    # 更新配置文件
    UserService.update_user_profile(
        user,
        email_notifications=False,
        show_email=False
    )
    
    return user

完全自定义用户模型

# 完全自定义用户模型(替代AbstractUser)
from django.contrib.auth.models import AbstractBaseUser, PermissionsMixin
from django.contrib.auth.base_user import BaseUserManager
from django.db import models

class EmailUserManager(BaseUserManager):
    """邮箱用户管理器"""
    
    def create_user(self, email, password=None, **extra_fields):
        """创建用户"""
        if not email:
            raise ValueError('邮箱地址不能为空')
        
        email = self.normalize_email(email)
        user = self.model(email=email, **extra_fields)
        user.set_password(password)
        user.save(using=self._db)
        return user
    
    def create_superuser(self, email, password=None, **extra_fields):
        """创建超级用户"""
        extra_fields.setdefault('is_staff', True)
        extra_fields.setdefault('is_superuser', True)
        extra_fields.setdefault('is_verified', True)
        
        if extra_fields.get('is_staff') is not True:
            raise ValueError('超级用户必须设置is_staff=True')
        if extra_fields.get('is_superuser') is not True:
            raise ValueError('超级用户必须设置is_superuser=True')
        
        return self.create_user(email, password, **extra_fields)

class EmailUser(AbstractBaseUser, PermissionsMixin):
    """邮箱用户模型"""
    email = models.EmailField(unique=True)
    username = models.CharField(max_length=150, blank=True)
    first_name = models.CharField(max_length=30, blank=True)
    last_name = models.CharField(max_length=150, blank=True)
    is_staff = models.BooleanField(default=False)
    is_active = models.BooleanField(default=True)
    is_verified = models.BooleanField(default=False)
    date_joined = models.DateTimeField(auto_now_add=True)
    
    # 自定义字段
    phone_number = models.CharField(max_length=20, blank=True)
    avatar = models.URLField(blank=True)
    bio = models.TextField(max_length=500, blank=True)
    
    USERNAME_FIELD = 'email'  # 使用邮箱作为登录字段
    REQUIRED_FIELDS = ['username']  # 创建超级用户时需要的字段
    
    objects = EmailUserManager()
    
    class Meta:
        db_table = 'email_user'
        verbose_name = '邮箱用户'
        verbose_name_plural = '邮箱用户'
    
    def __str__(self):
        return self.email
    
    def get_full_name(self):
        """获取完整姓名"""
        full_name = f"{self.first_name} {self.last_name}".strip()
        if not full_name:
            full_name = self.username or self.email
        return full_name
    
    def get_short_name(self):
        """获取简称"""
        return self.first_name or self.email.split('@')[0]

# 使用完全自定义用户模型的配置
"""
# settings.py 配置
AUTH_USER_MODEL = 'your_app.EmailUser'

# 注意:使用自定义用户模型需要在项目初期就设置,
# 一旦有数据后很难更改
"""

认证后端配置

多因素认证后端

# 多因素认证后端
import pyotp
import qrcode
from io import BytesIO
import base64
from django.contrib.auth.backends import BaseBackend
from django.contrib.auth import get_user_model

User = get_user_model()

class TwoFactorAuthBackend(BaseBackend):
    """两因素认证后端"""
    
    def authenticate(self, request, username=None, password=None, otp_code=None, **kwargs):
        """两因素认证"""
        if username is None or password is None:
            return None
        
        # 首先验证用户名密码
        user = self.authenticate_basic(username, password)
        if not user:
            return None
        
        # 检查是否启用两因素认证
        if hasattr(user, 'profile') and user.profile.enable_2fa:
            if not otp_code:
                # 需要OTP码但未提供
                raise ValueError('需要提供两因素认证码')
            
            # 验证OTP码
            if not self.verify_otp(user, otp_code):
                return None
        
        return user
    
    def authenticate_basic(self, username, password):
        """基础认证"""
        try:
            user = User.objects.get(username=username)
            if user.check_password(password):
                return user
        except User.DoesNotExist:
            pass
        return None
    
    def verify_otp(self, user, otp_code):
        """验证OTP码"""
        if not hasattr(user, 'profile') or not user.profile.totp_secret:
            return False
        
        totp = pyotp.TOTP(user.profile.totp_secret)
        return totp.verify(otp_code, valid_window=1)  # 允许前后1个时间窗口
    
    def get_user(self, user_id):
        """获取用户"""
        try:
            return User.objects.get(pk=user_id)
        except User.DoesNotExist:
            return None

# TOTP服务类
class TOTPService:
    """TOTP服务"""
    
    @staticmethod
    def generate_secret():
        """生成TOTP密钥"""
        return pyotp.random_base32()
    
    @staticmethod
    def generate_qr_code(user, issuer_name="YourApp"):
        """生成QR码"""
        secret = user.profile.totp_secret
        totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
            name=user.email,
            issuer_name=issuer_name
        )
        
        qr = qrcode.QRCode(version=1, box_size=10, border=5)
        qr.add_data(totp_uri)
        qr.make(fit=True)
        
        img = qr.make_image(fill_color="black", back_color="white")
        buffer = BytesIO()
        img.save(buffer, format='PNG')
        img_str = base64.b64encode(buffer.getvalue()).decode()
        
        return img_str, totp_uri
    
    @staticmethod
    def verify_code(secret, code):
        """验证TOTP码"""
        totp = pyotp.TOTP(secret)
        return totp.verify(code, valid_window=1)

# 更新用户配置文件模型以支持2FA
UserProfile.add_to_class(
    'enable_2fa',
    models.BooleanField(default=False)
)
UserProfile.add_to_class(
    'totp_secret',
    models.CharField(max_length=32, blank=True)
)
UserProfile.add_to_class(
    'backup_codes',
    models.TextField(blank=True)  # JSON格式存储备用码
)

# 社交认证后端
import requests
from social_core.backends.oauth import BaseOAuth2

class WeChatOAuthBackend(BaseBackend):
    """微信OAuth认证后端"""
    
    def authenticate(self, request, code=None, **kwargs):
        """微信OAuth认证"""
        if not code:
            return None
        
        # 获取访问令牌
        token_url = 'https://api.weixin.qq.com/sns/oauth2/access_token'
        params = {
            'appid': kwargs.get('appid'),
            'secret': kwargs.get('secret'),
            'code': code,
            'grant_type': 'authorization_code'
        }
        
        response = requests.get(token_url, params=params)
        token_data = response.json()
        
        if 'errcode' in token_data:
            return None
        
        # 获取用户信息
        user_info_url = 'https://api.weixin.qq.com/sns/userinfo'
        user_params = {
            'access_token': token_data['access_token'],
            'openid': token_data['openid'],
            'lang': 'zh_CN'
        }
        
        user_response = requests.get(user_info_url, params=user_params)
        user_data = user_response.json()
        
        if 'errcode' in user_data:
            return None
        
        # 创建或获取用户
        user, created = User.objects.get_or_create(
            username=f"wechat_{user_data['openid']}",
            defaults={
                'first_name': user_data.get('nickname', ''),
                'avatar': user_data.get('headimgurl', ''),
                'is_verified': True
            }
        )
        
        return user
    
    def get_user(self, user_id):
        """获取用户"""
        try:
            return User.objects.get(pk=user_id)
        except User.DoesNotExist:
            return None

# LDAP认证后端(需要python-ldap)
"""
class LDAPAuthBackend(BaseBackend):
    # LDAP认证后端实现
    pass
"""

# 自定义认证后端配置
"""
# settings.py 配置
AUTHENTICATION_BACKENDS = [
    'django.contrib.auth.backends.ModelBackend',  # 默认后端
    'your_app.backends.TwoFactorAuthBackend',     # 两因素认证
    'your_app.backends.WeChatOAuthBackend',       # 微信OAuth
    # 'your_app.backends.LDAPAuthBackend',       # LDAP认证
]
"""

密码策略和安全

# 密码策略和安全
from django.contrib.auth.hashers import make_password, check_password
from django.core.exceptions import ValidationError
import re
from django.conf import settings

class PasswordPolicy:
    """密码策略"""
    
    def __init__(self):
        self.min_length = getattr(settings, 'PASSWORD_MIN_LENGTH', 8)
        self.require_uppercase = getattr(settings, 'PASSWORD_REQUIRE_UPPERCASE', True)
        self.require_lowercase = getattr(settings, 'PASSWORD_REQUIRE_LOWERCASE', True)
        self.require_numbers = getattr(settings, 'PASSWORD_REQUIRE_NUMBERS', True)
        self.require_special = getattr(settings, 'PASSWORD_REQUIRE_SPECIAL', False)
        self.special_chars = getattr(settings, 'PASSWORD_SPECIAL_CHARS', '!@#$%^&*()_+-=[]{}|;:,.<>?')
    
    def validate_password(self, password, user=None):
        """验证密码强度"""
        errors = []
        
        # 长度检查
        if len(password) < self.min_length:
            errors.append(f'密码长度至少需要{self.min_length}个字符')
        
        # 大写字母检查
        if self.require_uppercase and not re.search(r'[A-Z]', password):
            errors.append('密码必须包含大写字母')
        
        # 小写字母检查
        if self.require_lowercase and not re.search(r'[a-z]', password):
            errors.append('密码必须包含小写字母')
        
        # 数字检查
        if self.require_numbers and not re.search(r'\d', password):
            errors.append('密码必须包含数字')
        
        # 特殊字符检查
        if self.require_special and not re.search(f'[{re.escape(self.special_chars)}]', password):
            errors.append(f'密码必须包含特殊字符: {self.special_chars}')
        
        # 检查是否与用户信息相似
        if user:
            similar_checks = [
                user.username.lower() in password.lower(),
                user.first_name.lower() in password.lower(),
                user.last_name.lower() in password.lower(),
                user.email.lower() in password.lower() if user.email else False
            ]
            
            if any(similar_checks):
                errors.append('密码不能与您的个人信息相似')
        
        if errors:
            raise ValidationError(errors)
        
        return password

# 密码历史记录
class PasswordHistory(models.Model):
    """密码历史记录"""
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    password_hash = models.CharField(max_length=128)  # 存储哈希后的密码
    created_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        db_table = 'password_history'
        ordering = ['-created_at']
    
    @classmethod
    def add_password(cls, user, raw_password):
        """添加密码到历史记录"""
        password_hash = make_password(raw_password)
        cls.objects.create(user=user, password_hash=password_hash)
        
        # 限制历史记录数量
        cls.trim_history(user)
    
    @classmethod
    def trim_history(cls, user, max_history=5):
        """限制密码历史记录数量"""
        history_count = cls.objects.filter(user=user).count()
        if history_count > max_history:
            oldest_records = cls.objects.filter(user=user).order_by('created_at')[max_history:]
            oldest_records.delete()
    
    @classmethod
    def is_password_used_before(cls, user, raw_password, check_count=5):
        """检查密码是否曾经使用过"""
        recent_history = cls.objects.filter(user=user).order_by('-created_at')[:check_count]
        for record in recent_history:
            if check_password(raw_password, record.password_hash):
                return True
        return False

# 密码重置令牌
from django.utils.crypto import get_random_string
from django.utils import timezone

class PasswordResetToken(models.Model):
    """密码重置令牌"""
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    token = models.CharField(max_length=32, unique=True)
    created_at = models.DateTimeField(auto_now_add=True)
    is_used = models.BooleanField(default=False)
    
    class Meta:
        db_table = 'password_reset_token'
    
    @classmethod
    def generate_token(cls, user):
        """生成重置令牌"""
        # 先删除旧令牌
        cls.objects.filter(user=user, is_used=False).delete()
        
        token = get_random_string(32)
        reset_token = cls.objects.create(user=user, token=token)
        return reset_token
    
    def is_valid(self):
        """检查令牌是否有效"""
        from django.conf import settings
        validity_period = getattr(settings, 'PASSWORD_RESET_TOKEN_VALIDITY', 3600)  # 1小时
        
        return (
            not self.is_used and
            (timezone.now() - self.created_at).seconds < validity_period
        )

# 密码服务
class PasswordService:
    """密码服务"""
    
    def __init__(self):
        self.policy = PasswordPolicy()
    
    def set_password(self, user, raw_password, check_history=True):
        """设置用户密码"""
        # 验证密码策略
        self.policy.validate_password(raw_password, user)
        
        # 检查密码历史
        if check_history and PasswordHistory.is_password_used_before(user, raw_password):
            raise ValidationError('新密码不能与最近使用的密码相同')
        
        # 设置密码
        user.set_password(raw_password)
        user.save()
        
        # 添加到历史记录
        PasswordHistory.add_password(user, raw_password)
    
    def change_password(self, user, old_password, new_password):
        """更改密码"""
        # 验证旧密码
        if not user.check_password(old_password):
            raise ValidationError('旧密码不正确')
        
        # 设置新密码
        self.set_password(user, new_password)
    
    def reset_password_with_token(self, token_str, new_password):
        """使用令牌重置密码"""
        try:
            reset_token = PasswordResetToken.objects.get(token=token_str)
        except PasswordResetToken.DoesNotExist:
            raise ValidationError('无效的重置令牌')
        
        if not reset_token.is_valid():
            raise ValidationError('重置令牌已过期或已被使用')
        
        # 设置新密码
        self.set_password(reset_token.user, new_password)
        
        # 标记令牌为已使用
        reset_token.is_used = True
        reset_token.save()
    
    def generate_password_reset_link(self, user):
        """生成密码重置链接"""
        token = PasswordResetToken.generate_token(user)
        
        # 这里应该生成实际的重置链接
        # 通常会发送邮件或短信
        reset_link = f"/reset-password/{token.token}/"
        return reset_link

# 密码强度指示器
class PasswordStrengthIndicator:
    """密码强度指示器"""
    
    @staticmethod
    def calculate_strength(password):
        """计算密码强度"""
        score = 0
        feedback = []
        
        # 长度分数
        if len(password) >= 8:
            score += 1
        else:
            feedback.append("密码至少需要8个字符")
        
        if len(password) >= 12:
            score += 1
        
        # 字符多样性
        if re.search(r'[a-z]', password):
            score += 1
        else:
            feedback.append("添加小写字母")
        
        if re.search(r'[A-Z]', password):
            score += 1
        else:
            feedback.append("添加大写字母")
        
        if re.search(r'\d', password):
            score += 1
        else:
            feedback.append("添加数字")
        
        if re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
            score += 1
        else:
            feedback.append("添加特殊字符")
        
        # 重复字符检查
        if len(set(password)) / len(password) < 0.7:  # 70%唯一字符
            score -= 1
            feedback.append("避免重复字符")
        
        # 强度评级
        if score <= 2:
            strength = 'weak'
            level = '弱'
        elif score <= 4:
            strength = 'medium'
            level = '中'
        else:
            strength = 'strong'
            level = '强'
        
        return {
            'score': score,
            'strength': strength,
            'level': level,
            'feedback': feedback,
            'percentage': min(100, int((score / 6) * 100))
        }

# 使用示例
def password_management_example():
    """密码管理示例"""
    service = PasswordService()
    indicator = PasswordStrengthIndicator()
    
    # 检查密码强度
    password_strength = indicator.calculate_strength("MyPass123!")
    print(f"密码强度: {password_strength}")
    
    # 创建用户并设置密码
    user = CustomUser.objects.create_user(
        username='testuser',
        email='test@example.com'
    )
    
    try:
        # 设置符合策略的密码
        service.set_password(user, "SecurePass123!")
        print("密码设置成功")
    except ValidationError as e:
        print(f"密码验证失败: {e}")
    
    # 生成密码重置链接
    reset_link = service.generate_password_reset_link(user)
    print(f"重置链接: {reset_link}")
    
    return user

用户管理与操作

用户CRUD操作

# 用户CRUD操作
from django.contrib.auth import get_user_model
from django.db import transaction
from django.core.exceptions import ValidationError

User = get_user_model()

class UserManager:
    """用户管理器"""
    
    @staticmethod
    def create_user_with_validation(username, email, password, **extra_fields):
        """创建用户(带验证)"""
        # 验证输入
        validator = UserValidator()
        
        if username:
            validator.validate_username(username)
        if email:
            validator.validate_email(email)
        
        # 检查唯一性
        if User.objects.filter(username=username).exists():
            raise ValidationError('用户名已存在')
        
        if email and User.objects.filter(email=email).exists():
            raise ValidationError('邮箱已被注册')
        
        # 创建用户
        with transaction.atomic():
            user = User.objects.create_user(
                username=username,
                email=email,
                password=password,
                **extra_fields
            )
        
        return user
    
    @staticmethod
    def update_user(user_id, **updates):
        """更新用户信息"""
        try:
            user = User.objects.get(id=user_id)
        except User.DoesNotExist:
            raise ValidationError('用户不存在')
        
        # 验证更新字段
        validator = UserValidator()
        
        if 'username' in updates:
            validator.validate_username(updates['username'])
            # 检查用户名唯一性(排除当前用户)
            if User.objects.filter(username=updates['username']).exclude(id=user_id).exists():
                raise ValidationError('用户名已被使用')
        
        if 'email' in updates:
            validator.validate_email(updates['email'])
            # 检查邮箱唯一性
            if User.objects.filter(email=updates['email']).exclude(id=user_id).exists():
                raise ValidationError('邮箱已被使用')
        
        # 更新字段
        for field, value in updates.items():
            if hasattr(user, field):
                setattr(user, field, value)
        
        user.save()
        return user
    
    @staticmethod
    def deactivate_user(user_id, reason=''):
        """停用用户"""
        try:
            user = User.objects.get(id=user_id)
            user.is_active = False
            user.save()
            
            # 记录停用原因
            UserDeactivationLog.objects.create(
                user=user,
                reason=reason,
                deactivated_by=None  # 可以记录操作人
            )
            
            return True
        except User.DoesNotExist:
            return False
    
    @staticmethod
    def activate_user(user_id):
        """激活用户"""
        try:
            user = User.objects.get(id=user_id)
            user.is_active = True
            user.save()
            return True
        except User.DoesNotExist:
            return False
    
    @staticmethod
    def delete_user(user_id, hard_delete=False):
        """删除用户"""
        try:
            user = User.objects.get(id=user_id)
            
            if hard_delete:
                # 硬删除
                user.delete()
            else:
                # 软删除 - 重命名并停用
                user.username = f"deleted_{user.id}_{user.username}"
                user.is_active = False
                user.save()
            
            return True
        except User.DoesNotExist:
            return False
    
    @staticmethod
    def bulk_create_users(user_data_list):
        """批量创建用户"""
        users = []
        errors = []
        
        for i, user_data in enumerate(user_data_list):
            try:
                user = UserManager.create_user_with_validation(**user_data)
                users.append(user)
            except ValidationError as e:
                errors.append(f"用户 {i+1}: {str(e)}")
        
        return users, errors

# 用户停用日志
class UserDeactivationLog(models.Model):
    """用户停用日志"""
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    reason = models.TextField()
    deactivated_by = models.ForeignKey(
        User, 
        on_delete=models.SET_NULL, 
        null=True, 
        related_name='deactivated_users'
    )
    deactivated_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        db_table = 'user_deactivation_log'

# 用户搜索和筛选
class UserSearchFilter:
    """用户搜索和筛选"""
    
    @staticmethod
    def search_users(query, filters=None):
        """搜索用户"""
        from django.db.models import Q
        
        queryset = User.objects.all()
        
        # 文本搜索
        if query:
            queryset = queryset.filter(
                Q(username__icontains=query) |
                Q(first_name__icontains=query) |
                Q(last_name__icontains=query) |
                Q(email__icontains=query)
            )
        
        # 应用过滤器
        if filters:
            if 'is_active' in filters:
                queryset = queryset.filter(is_active=filters['is_active'])
            
            if 'is_verified' in filters:
                if hasattr(User, 'is_verified'):
                    queryset = queryset.filter(is_verified=filters['is_verified'])
            
            if 'date_joined_after' in filters:
                queryset = queryset.filter(date_joined__gte=filters['date_joined_after'])
            
            if 'date_joined_before' in filters:
                queryset = queryset.filter(date_joined__lte=filters['date_joined_before'])
            
            if 'groups' in filters:
                queryset = queryset.filter(groups__name__in=filters['groups'])
        
        return queryset.distinct()
    
    @staticmethod
    def get_user_statistics():
        """获取用户统计信息"""
        total_users = User.objects.count()
        active_users = User.objects.filter(is_active=True).count()
        verified_users = User.objects.filter(is_verified=True).count() if hasattr(User, 'is_verified') else 0
        premium_users = User.objects.filter(is_premium=True).count() if hasattr(User, 'is_premium') else 0
        
        # 按日期统计
        from django.db.models import Count
        from django.utils import timezone
        from datetime import timedelta
        
        today = timezone.now().date()
        last_week = today - timedelta(days=7)
        
        daily_signups = User.objects.filter(
            date_joined__date__range=[last_week, today]
        ).values('date_joined__date').annotate(count=Count('id')).order_by('date_joined__date')
        
        return {
            'total': total_users,
            'active': active_users,
            'verified': verified_users,
            'premium': premium_users,
            'active_percentage': (active_users / total_users * 100) if total_users > 0 else 0,
            'daily_signups': list(daily_signups)
        }

# 用户导入导出
import csv
import json
from django.http import HttpResponse

class UserImportExport:
    """用户导入导出"""
    
    @staticmethod
    def export_users_csv(users, fields=None):
        """导出用户为CSV"""
        if fields is None:
            fields = ['username', 'email', 'first_name', 'last_name', 'is_active', 'date_joined']
        
        response = HttpResponse(content_type='text/csv')
        response['Content-Disposition'] = 'attachment; filename="users.csv"'
        
        writer = csv.writer(response)
        
        # 写入表头
        writer.writerow(fields)
        
        # 写入数据
        for user in users:
            row = []
            for field in fields:
                value = getattr(user, field, '')
                if isinstance(value, bool):
                    value = 'Yes' if value else 'No'
                elif hasattr(value, 'strftime'):  # DateTime
                    value = value.strftime('%Y-%m-%d %H:%M:%S')
                row.append(str(value))
            writer.writerow(row)
        
        return response
    
    @staticmethod
    def import_users_from_csv(csv_file, required_fields=None):
        """从CSV导入用户"""
        if required_fields is None:
            required_fields = ['username', 'email']
        
        users_created = []
        errors = []
        
        decoded_file = csv_file.read().decode('utf-8').splitlines()
        reader = csv.DictReader(decoded_file)
        
        for row_num, row in enumerate(reader, start=2):  # 从第2行开始(跳过表头)
            try:
                # 验证必需字段
                for field in required_fields:
                    if not row.get(field):
                        raise ValidationError(f"第{row_num}行缺少必需字段: {field}")
                
                # 创建用户
                user_data = {
                    'username': row.get('username'),
                    'email': row.get('email', ''),
                    'first_name': row.get('first_name', ''),
                    'last_name': row.get('last_name', ''),
                    'is_active': row.get('is_active', 'Yes').lower() in ['yes', 'true', '1'],
                }
                
                # 设置默认密码(实际应用中应该发送密码重置邮件)
                user_data['password'] = 'TempPassword123!'
                
                user = UserManager.create_user_with_validation(**user_data)
                users_created.append(user)
                
            except ValidationError as e:
                errors.append(f"第{row_num}行错误: {str(e)}")
            except Exception as e:
                errors.append(f"第{row_num}行未知错误: {str(e)}")
        
        return users_created, errors

# 使用示例
def user_management_example():
    """用户管理示例"""
    # 创建用户
    user = UserManager.create_user_with_validation(
        username='newuser',
        email='newuser@example.com',
        password='SecurePass123!',
        first_name='New',
        last_name='User'
    )
    
    # 搜索用户
    search_results = UserSearchFilter.search_users('new', {
        'is_active': True
    })
    
    # 获取统计信息
    stats = UserSearchFilter.get_user_statistics()
    print(f"用户统计: {stats}")
    
    return user

用户活动追踪

# 用户活动追踪
class UserActivityTracker:
    """用户活动追踪器"""
    
    @staticmethod
    def log_user_activity(user, activity_type, details=None, ip_address=None):
        """记录用户活动"""
        return UserActivityLog.objects.create(
            user=user,
            activity_type=activity_type,
            details=details or {},
            ip_address=ip_address,
            timestamp=timezone.now()
        )
    
    @staticmethod
    def get_user_activity(user, days=30):
        """获取用户活动记录"""
        from datetime import timedelta
        
        start_date = timezone.now() - timedelta(days=days)
        return UserActivityLog.objects.filter(
            user=user,
            timestamp__gte=start_date
        ).order_by('-timestamp')
    
    @staticmethod
    def get_user_engagement_score(user):
        """计算用户参与度得分"""
        activities = UserActivityLog.objects.filter(
            user=user,
            timestamp__gte=timezone.now() - timezone.timedelta(days=30)
        )
        
        score = 0
        activity_types = activities.values_list('activity_type', flat=True)
        
        # 不同活动类型的权重
        weights = {
            'login': 5,
            'view_content': 2,
            'create_content': 10,
            'comment': 3,
            'share': 4,
            'purchase': 15
        }
        
        for activity_type in activity_types:
            score += weights.get(activity_type, 1)
        
        return score

# 用户活动日志模型
class UserActivityLog(models.Model):
    """用户活动日志"""
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    activity_type = models.CharField(max_length=50)
    details = models.JSONField(default=dict)  # 存储活动详情
    ip_address = models.GenericIPAddressField(null=True, blank=True)
    user_agent = models.TextField(blank=True)
    timestamp = models.DateTimeField(auto_now_add=True)
    session_key = models.CharField(max_length=40, blank=True)
    
    class Meta:
        db_table = 'user_activity_log'
        ordering = ['-timestamp']
        indexes = [
            models.Index(fields=['user', 'timestamp']),
            models.Index(fields=['activity_type']),
            models.Index(fields=['timestamp']),
        ]

# 用户在线状态追踪
class UserOnlineStatus(models.Model):
    """用户在线状态"""
    user = models.OneToOneField(User, on_delete=models.CASCADE)
    is_online = models.BooleanField(default=False)
    last_seen = models.DateTimeField(auto_now=True)
    session_key = models.CharField(max_length=40, blank=True)
    
    class Meta:
        db_table = 'user_online_status'

# 在线用户追踪中间件
class OnlineUserMiddleware:
    """在线用户追踪中间件"""
    
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        if request.user.is_authenticated:
            # 更新用户在线状态
            online_status, created = UserOnlineStatus.objects.get_or_create(
                user=request.user
            )
            online_status.is_online = True
            online_status.last_seen = timezone.now()
            online_status.session_key = request.session.session_key
            online_status.save()
        
        response = self.get_response(request)
        
        return response

# 用户行为分析
class UserBehaviorAnalyzer:
    """用户行为分析器"""
    
    @staticmethod
    def analyze_user_behavior(user):
        """分析用户行为"""
        from datetime import timedelta
        
        thirty_days_ago = timezone.now() - timedelta(days=30)
        
        activities = UserActivityLog.objects.filter(
            user=user,
            timestamp__gte=thirty_days_ago
        )
        
        analysis = {
            'total_activities': activities.count(),
            'activities_by_type': activities.values('activity_type').annotate(
                count=models.Count('activity_type')
            ),
            'most_active_day': activities.extra(
                select={'day': 'date(timestamp)'}
            ).values('day').annotate(
                count=models.Count('id')
            ).order_by('-count').first(),
            'average_daily_activities': activities.extra(
                select={'day': 'date(timestamp)'}
            ).values('day').annotate(
                count=models.Count('id')
            ).aggregate(avg=models.Avg('count'))['avg'] or 0,
            'last_activity': activities.order_by('-timestamp').first()
        }
        
        return analysis
    
    @staticmethod
    def get_user_retention_cohort(user):
        """获取用户留存队列分析"""
        from datetime import datetime
        
        # 获取用户首次活动日期
        first_activity = UserActivityLog.objects.filter(
            user=user
        ).order_by('timestamp').first()
        
        if not first_activity:
            return {}
        
        cohort_date = first_activity.timestamp.date()
        
        # 分析后续活动
        retention_data = {}
        for i in range(7):  # 分析7天留存
            check_date = cohort_date + timedelta(days=i)
            has_activity = UserActivityLog.objects.filter(
                user=user,
                timestamp__date=check_date
            ).exists()
            retention_data[f'day_{i}'] = has_activity
        
        return retention_data

# 使用示例
def user_tracking_example(request):
    """用户追踪示例"""
    if request.user.is_authenticated:
        # 记录活动
        UserActivityTracker.log_user_activity(
            user=request.user,
            activity_type='page_view',
            details={'page': request.path},
            ip_address=request.META.get('REMOTE_ADDR')
        )
        
        # 分析行为
        behavior = UserBehaviorAnalyzer.analyze_user_behavior(request.user)
        engagement_score = UserActivityTracker.get_user_engagement_score(request.user)
        
        return {
            'behavior': behavior,
            'engagement_score': engagement_score
        }
    
    return {}

权限与组管理

权限系统基础

# 权限系统基础
from django.contrib.auth.models import Permission, Group
from django.contrib.contenttypes.models import ContentType
from django.apps import apps

class PermissionManager:
    """权限管理器"""
    
    @staticmethod
    def create_custom_permission(app_label, model_name, codename, name, content_type=None):
        """创建自定义权限"""
        if not content_type:
            try:
                model = apps.get_model(app_label, model_name)
                content_type = ContentType.objects.get_for_model(model)
            except LookupError:
                raise ValueError(f"找不到模型 {app_label}.{model_name}")
        
        permission, created = Permission.objects.get_or_create(
            codename=codename,
            content_type=content_type,
            defaults={
                'name': name
            }
        )
        
        return permission, created
    
    @staticmethod
    def assign_permission_to_user(user, permission_codename):
        """为用户分配权限"""
        try:
            if '.' in permission_codename:
                app_label, codename = permission_codename.split('.', 1)
                permission = Permission.objects.get(
                    content_type__app_label=app_label,
                    codename=codename
                )
            else:
                permission = Permission.objects.get(codename=permission_codename)
            
            user.user_permissions.add(permission)
            return True
        except Permission.DoesNotExist:
            return False
    
    @staticmethod
    def remove_permission_from_user(user, permission_codename):
        """从用户移除权限"""
        try:
            if '.' in permission_codename:
                app_label, codename = permission_codename.split('.', 1)
                permission = Permission.objects.get(
                    content_type__app_label=app_label,
                    codename=codename
                )
            else:
                permission = Permission.objects.get(codename=permission_codename)
            
            user.user_permissions.remove(permission)
            return True
        except Permission.DoesNotExist:
            return False
    
    @staticmethod
    def bulk_assign_permissions_to_user(user, permission_list):
        """批量为用户分配权限"""
        permissions = []
        
        for perm_codename in permission_list:
            try:
                if '.' in perm_codename:
                    app_label, codename = perm_codename.split('.', 1)
                    permission = Permission.objects.get(
                        content_type__app_label=app_label,
                        codename=codename
                    )
                else:
                    permission = Permission.objects.get(codename=perm_codename)
                permissions.append(permission)
            except Permission.DoesNotExist:
                continue  # 跳过不存在的权限
        
        user.user_permissions.set(permissions)
        return len(permissions)

# 自定义权限检查器
class CustomPermissionChecker:
    """自定义权限检查器"""
    
    def __init__(self, user):
        self.user = user
        self.user_permissions = None
        self.user_groups = None
    
    def get_user_permissions(self):
        """获取用户权限"""
        if self.user_permissions is None:
            self.user_permissions = set(
                self.user.user_permissions.values_list('codename', flat=True)
            )
            # 添加组权限
            group_permissions = self.user.groups.values_list(
                'permissions__codename', flat=True
            )
            self.user_permissions.update(group_permissions)
        
        return self.user_permissions
    
    def has_permission(self, permission_codename):
        """检查用户权限"""
        if self.user.is_superuser:
            return True
        
        if not self.user.is_authenticated:
            return False
        
        user_perms = self.get_user_permissions()
        return permission_codename in user_perms
    
    def has_any_permission(self, permission_list):
        """检查用户是否有任一权限"""
        if self.user.is_superuser:
            return True
        
        if not self.user.is_authenticated:
            return False
        
        user_perms = self.get_user_permissions()
        return bool(set(permission_list) & user_perms)
    
    def has_all_permissions(self, permission_list):
        """检查用户是否拥有所有权限"""
        if self.user.is_superuser:
            return True
        
        if not self.user.is_authenticated:
            return False
        
        user_perms = self.get_user_permissions()
        return set(permission_list).issubset(user_perms)
    
    def get_user_roles(self):
        """获取用户角色(组)"""
        if self.user_groups is None:
            self.user_groups = list(
                self.user.groups.values_list('name', flat=True)
            )
        return self.user_groups
    
    def has_role(self, role_name):
        """检查用户角色"""
        return role_name in self.get_user_roles()

# 权限装饰器
def permission_required_or_403(perm, login_url=None, raise_exception=False):
    """权限要求装饰器(403拒绝)"""
    def check_perms(user):
        if not isinstance(perm, (list, tuple)):
            perms = (perm,)
        else:
            perms = perm
        
        permission_checker = CustomPermissionChecker(user)
        return permission_checker.has_any_permission(perms)
    
    def decorator(view_func):
        @wraps(view_func)
        def _wrapped_view(request, *args, **kwargs):
            if not check_perms(request.user):
                if raise_exception:
                    from django.core.exceptions import PermissionDenied
                    raise PermissionDenied
                else:
                    return HttpResponseForbidden("Permission denied")
            return view_func(request, *args, **kwargs)
        return _wrapped_view
    return decorator

# 对象级权限
class ObjectPermissionManager:
    """对象级权限管理器"""
    
    def __init__(self):
        self.cache = {}
    
    def user_has_obj_perm(self, user, obj, permission_type):
        """检查用户对特定对象的权限"""
        cache_key = f"{user.id}_{obj._meta.label}_{obj.pk}_{permission_type}"
        
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        # 检查对象所有权
        if hasattr(obj, 'user') and obj.user == user:
            result = True
        elif hasattr(obj, 'owner') and obj.owner == user:
            result = True
        else:
            # 检查特殊权限(如团队成员、协作者等)
            result = self.check_extended_permissions(user, obj, permission_type)
        
        self.cache[cache_key] = result
        return result
    
    def check_extended_permissions(self, user, obj, permission_type):
        """检查扩展权限"""
        # 这里可以实现团队权限、协作权限等逻辑
        # 示例:检查用户是否是对象所在团队的成员
        if hasattr(obj, 'team') and hasattr(user, 'teams'):
            return user.teams.filter(id=obj.team.id).exists()
        
        return False
    
    def clear_cache(self):
        """清除缓存"""
        self.cache.clear()

# 使用示例
def permission_usage_example(request):
    """权限使用示例"""
    checker = CustomPermissionChecker(request.user)
    
    # 检查权限
    can_edit = checker.has_permission('app.change_model')
    can_delete = checker.has_any_permission(['app.delete_model', 'app.super_delete'])
    has_all_perms = checker.has_all_permissions(['app.view_model', 'app.change_model'])
    
    # 检查角色
    is_admin = checker.has_role('admin')
    roles = checker.get_user_roles()
    
    return {
        'can_edit': can_edit,
        'can_delete': can_delete,
        'has_all_perms': has_all_perms,
        'is_admin': is_admin,
        'roles': roles
    }

组管理与角色系统

# 组管理与角色系统
class GroupManager:
    """组管理器"""
    
    @staticmethod
    def create_group_with_permissions(name, permissions=None, description=''):
        """创建组并分配权限"""
        group, created = Group.objects.get_or_create(name=name)
        
        if permissions:
            perm_objects = []
            for perm_codename in permissions:
                try:
                    if '.' in perm_codename:
                        app_label, codename = perm_codename.split('.', 1)
                        perm = Permission.objects.get(
                            content_type__app_label=app_label,
                            codename=codename
                        )
                    else:
                        perm = Permission.objects.get(codename=perm_codename)
                    perm_objects.append(perm)
                except Permission.DoesNotExist:
                    continue  # 跳过不存在的权限
            
            group.permissions.set(perm_objects)
        
        # 如果需要,可以添加描述字段
        if description and not created:
            # 这里假设你有一个扩展的Group模型或单独的描述表
            pass
        
        return group, created
    
    @staticmethod
    def assign_user_to_group(user, group_name):
        """将用户分配到组"""
        try:
            group = Group.objects.get(name=group_name)
            user.groups.add(group)
            return True
        except Group.DoesNotExist:
            return False
    
    @staticmethod
    def remove_user_from_group(user, group_name):
        """将用户从组移除"""
        try:
            group = Group.objects.get(name=group_name)
            user.groups.remove(group)
            return True
        except Group.DoesNotExist:
            return False
    
    @staticmethod
    def bulk_assign_users_to_group(user_ids, group_name):
        """批量将用户分配到组"""
        try:
            group = Group.objects.get(name=group_name)
            users = User.objects.filter(id__in=user_ids)
            group.user_set.add(*users)
            return True
        except Group.DoesNotExist:
            return False
    
    @staticmethod
    def get_group_members(group_name):
        """获取组成员"""
        try:
            group = Group.objects.get(name=group_name)
            return group.user_set.all()
        except Group.DoesNotExist:
            return User.objects.none()

# 预定义角色系统
class PredefinedRoles:
    """预定义角色"""
    
    ROLES_PERMISSIONS = {
        'admin': [
            'auth.add_user', 'auth.change_user', 'auth.delete_user',
            'auth.add_group', 'auth.change_group', 'auth.delete_group',
            'contenttypes.add_contenttype', 'contenttypes.change_contenttype',
            'contenttypes.delete_contenttype', 'admin.add_logentry',
            'admin.change_logentry', 'admin.delete_logentry',
            # 添加其他管理权限
        ],
        'manager': [
            'auth.change_user', 'auth.change_group',
            # 管理员特定权限
        ],
        'editor': [
            'app.add_article', 'app.change_article', 'app.delete_article',
            # 编辑器权限
        ],
        'viewer': [
            'app.view_article',
            # 查看者权限
        ],
        'moderator': [
            'app.change_article', 'app.delete_comment',
            # 审核员权限
        ]
    }
    
    @classmethod
    def setup_predefined_roles(cls):
        """设置预定义角色"""
        for role_name, permissions in cls.ROLES_PERMISSIONS.items():
            GroupManager.create_group_with_permissions(
                name=role_name,
                permissions=permissions
            )
    
    @classmethod
    def assign_role_to_user(cls, user, role_name):
        """为用户分配角色"""
        if role_name in cls.ROLES_PERMISSIONS:
            return GroupManager.assign_user_to_group(user, role_name)
        return False
    
    @classmethod
    def get_available_roles(cls):
        """获取可用角色"""
        return list(cls.ROLES_PERMISSIONS.keys())

# 动态角色系统
class DynamicRoleSystem:
    """动态角色系统"""
    
    def __init__(self):
        self.role_cache = {}
    
    def create_dynamic_role(self, name, permissions, conditions=None):
        """创建动态角色"""
        group, created = Group.objects.get_or_create(name=name)
        
        # 分配权限
        perm_objects = []
        for perm_codename in permissions:
            try:
                if '.' in perm_codename:
                    app_label, codename = perm_codename.split('.', 1)
                    perm = Permission.objects.get(
                        content_type__app_label=app_label,
                        codename=codename
                    )
                else:
                    perm = Permission.objects.get(codename=perm_codename)
                perm_objects.append(perm)
            except Permission.DoesNotExist:
                continue  # 跳过不存在的权限
        
        group.permissions.set(perm_objects)
        
        # 存储条件(如果有的话)
        if conditions:
            # 这里可以实现条件存储逻辑
            # 例如:存储在自定义模型中或使用JSON字段
            pass
        
        return group, created
    
    def assign_conditional_role(self, user, role_name, conditions=None):
        """分配条件角色"""
        if conditions:
            # 检查条件是否满足
            if self.check_conditions(user, conditions):
                return GroupManager.assign_user_to_group(user, role_name)
            return False
        else:
            return GroupManager.assign_user_to_group(user, role_name)
    
    def check_conditions(self, user, conditions):
        """检查角色分配条件"""
        # 实现条件检查逻辑
        # 例如:用户属性、时间限制、业务规则等
        for condition_type, condition_value in conditions.items():
            if condition_type == 'user_field':
                # 检查用户字段值
                field_name, expected_value = condition_value
                if getattr(user, field_name) != expected_value:
                    return False
            elif condition_type == 'time_limit':
                # 检查时间限制
                from datetime import datetime
                start_time, end_time = condition_value
                current_time = datetime.now()
                if not (start_time <= current_time <= end_time):
                    return False
        
        return True

# 角色继承系统
class RoleInheritanceSystem:
    """角色继承系统"""
    
    def __init__(self):
        self.inheritance_cache = {}
    
    def create_role_hierarchy(self, child_role, parent_roles):
        """创建角色层次结构"""
        # 创建子角色
        child_group, created = Group.objects.get_or_create(name=child_role)
        
        # 获取父角色权限
        all_permissions = set()
        for parent_role in parent_roles:
            try:
                parent_group = Group.objects.get(name=parent_role)
                parent_perms = parent_group.permissions.all()
                all_permissions.update(parent_perms)
            except Group.DoesNotExist:
                continue
        
        # 将所有权限分配给子角色
        child_group.permissions.set(all_permissions)
        
        # 记录继承关系(可以存储在自定义模型中)
        self.record_inheritance(child_role, parent_roles)
        
        return child_group
    
    def record_inheritance(self, child_role, parent_roles):
        """记录继承关系"""
        # 这里可以实现继承关系的持久化存储
        # 例如:创建一个RoleHierarchy模型
        pass
    
    def get_effective_permissions(self, user):
        """获取用户的有效权限(包括继承的)"""
        # 获取用户直接拥有的权限
        direct_perms = user.user_permissions.all()
        
        # 获取用户所属组的权限
        group_perms = Permission.objects.filter(group__user=user)
        
        # 合并所有权限
        all_perms = set(direct_perms) | set(group_perms)
        
        return all_perms

# 使用示例
def role_system_example():
    """角色系统示例"""
    # 设置预定义角色
    PredefinedRoles.setup_predefined_roles()
    
    # 创建用户
    user = CustomUser.objects.create_user(
        username='testuser',
        email='test@example.com',
        password='password123'
    )
    
    # 分配角色
    PredefinedRoles.assign_role_to_user(user, 'viewer')
    
    # 创建动态角色
    dynamic_role = DynamicRoleSystem()
    editor_role, created = dynamic_role.create_dynamic_role(
        name='project_editor',
        permissions=['app.add_project', 'app.change_project']
    )
    
    # 分配动态角色
    GroupManager.assign_user_to_group(user, 'project_editor')
    
    return user

# 权限缓存系统
class PermissionCache:
    """权限缓存系统"""
    
    def __init__(self):
        self.cache = {}
        self.timeout = 300  # 5分钟缓存
    
    def get_user_permissions(self, user_id):
        """获取用户权限缓存"""
        cache_key = f"user_perms_{user_id}"
        
        if cache_key in self.cache:
            cached_data, timestamp = self.cache[cache_key]
            if (time.time() - timestamp) < self.timeout:
                return cached_data
        
        # 缓存未命中或已过期,重新获取
        from django.contrib.auth.models import User
        user = User.objects.get(id=user_id)
        permissions = list(user.get_all_permissions())
        
        # 存储到缓存
        self.cache[cache_key] = (permissions, time.time())
        return permissions
    
    def invalidate_user_cache(self, user_id):
        """使用户权限缓存失效"""
        cache_key = f"user_perms_{user_id}"
        if cache_key in self.cache:
            del self.cache[cache_key]
    
    def clear_all_cache(self):
        """清除所有缓存"""
        self.cache.clear()

# 权限审计系统
class PermissionAudit:
    """权限审计系统"""
    
    @staticmethod
    def log_permission_change(user, permission, action, changed_by=None):
        """记录权限变更"""
        PermissionChangeLog.objects.create(
            user=user,
            permission=permission,
            action=action,  # 'add', 'remove', 'modify'
            changed_by=changed_by,
            timestamp=timezone.now()
        )
    
    @staticmethod
    def get_permission_audit_trail(user):
        """获取权限审计轨迹"""
        return PermissionChangeLog.objects.filter(
            user=user
        ).order_by('-timestamp')

class PermissionChangeLog(models.Model):
    """权限变更日志"""
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    permission = models.ForeignKey(Permission, on_delete=models.CASCADE)
    action = models.CharField(max_length=20, choices=[
        ('add', '添加'),
        ('remove', '移除'),
        ('modify', '修改')
    ])
    changed_by = models.ForeignKey(
        User, 
        on_delete=models.SET_NULL, 
        null=True, 
        related_name='performed_changes'
    )
    timestamp = models.DateTimeField(auto_now_add=True)
    details = models.TextField(blank=True)
    
    class Meta:
        db_table = 'permission_change_log'
        ordering = ['-timestamp']

## 认证安全实践 \{#认证安全实践}

### 密码安全

```python
# 密码安全实践
from django.contrib.auth.hashers import make_password, check_password, identify_hasher
from django.core.validators import validate_password
from django.core.exceptions import ValidationError

class PasswordSecurity:
    """密码安全"""
    
    @staticmethod
    def generate_secure_password(length=12, include_symbols=True):
        """生成安全密码"""
        import random
        import string
        
        chars = string.ascii_letters + string.digits
        if include_symbols:
            chars += "!@#$%^&*()_+-=[]{}|;:,.<>?"
        
        password = ''.join(random.choice(chars) for _ in range(length))
        return password
    
    @staticmethod
    def check_password_strength(password):
        """检查密码强度"""
        score = 0
        feedback = []
        
        # 长度检查
        if len(password) >= 8:
            score += 1
        else:
            feedback.append("密码长度至少8位")
        
        if len(password) >= 12:
            score += 1
        
        # 字符类型检查
        if re.search(r'[a-z]', password):
            score += 1
        else:
            feedback.append("需要包含小写字母")
        
        if re.search(r'[A-Z]', password):
            score += 1
        else:
            feedback.append("需要包含大写字母")
        
        if re.search(r'\d', password):
            score += 1
        else:
            feedback.append("需要包含数字")
        
        if re.search(r'[!@#$%^&*()_+\-=\[\]{}|;:,.<>?]', password):
            score += 1
        else:
            feedback.append("需要包含特殊字符")
        
        # 常见密码检查
        common_passwords = [
            'password', '123456', 'qwerty', 'abc123',
            'password123', 'admin', 'letmein'
        ]
        
        if password.lower() in common_passwords:
            score = max(0, score - 2)
            feedback.append("密码过于常见")
        
        # 重复字符检查
        if len(set(password)) / len(password) < 0.7:  # 70%唯一字符
            score = max(0, score - 1)
            feedback.append("避免重复字符")
        
        strength_levels = {
            0: 'very_weak',
            1: 'very_weak',
            2: 'weak', 
            3: 'fair',
            4: 'good',
            5: 'strong',
            6: 'very_strong'
        }
        
        return {
            'score': score,
            'strength': strength_levels.get(score, 'very_weak'),
            'feedback': feedback,
            'percentage': min(100, int((score / 6) * 100))
        }
    
    @staticmethod
    def hash_password_with_salt(password, salt=None):
        """使用盐值哈希密码"""
        return make_password(password, salt=salt)
    
    @staticmethod
    def verify_password_hash(password, hashed_password):
        """验证密码哈希"""
        return check_password(password, hashed_password)
    
    @staticmethod
    def get_password_hash_algorithm(hashed_password):
        """获取密码哈希算法"""
        try:
            hasher = identify_hasher(hashed_password)
            return hasher.algorithm
        except ValueError:
            return None

# 密码策略管理器
class PasswordPolicyManager:
    """密码策略管理器"""
    
    def __init__(self):
        self.policies = self.load_policies()
    
    def load_policies(self):
        """加载密码策略"""
        return {
            'min_length': getattr(settings, 'PASSWORD_MIN_LENGTH', 8),
            'require_uppercase': getattr(settings, 'PASSWORD_REQUIRE_UPPERCASE', True),
            'require_lowercase': getattr(settings, 'PASSWORD_REQUIRE_LOWERCASE', True),
            'require_numbers': getattr(settings, 'PASSWORD_REQUIRE_NUMBERS', True),
            'require_special': getattr(settings, 'PASSWORD_REQUIRE_SPECIAL', False),
            'max_consecutive_chars': getattr(settings, 'PASSWORD_MAX_CONSECUTIVE_CHARS', 3),
            'history_check_count': getattr(settings, 'PASSWORD_HISTORY_CHECK_COUNT', 5),
            'expiry_days': getattr(settings, 'PASSWORD_EXPIRY_DAYS', 90),
        }
    
    def validate_password_compliance(self, password, user=None):
        """验证密码合规性"""
        errors = []
        
        policy = self.policies
        
        # 长度检查
        if len(password) < policy['min_length']:
            errors.append(f"密码长度至少需要{policy['min_length']}个字符")
        
        # 大写字母检查
        if policy['require_uppercase'] and not re.search(r'[A-Z]', password):
            errors.append("密码必须包含大写字母")
        
        # 小写字母检查
        if policy['require_lowercase'] and not re.search(r'[a-z]', password):
            errors.append("密码必须包含小写字母")
        
        # 数字检查
        if policy['require_numbers'] and not re.search(r'\d', password):
            errors.append("密码必须包含数字")
        
        # 特殊字符检查
        if policy['require_special']:
            special_pattern = r'[!@#$%^&*()_+\-=\[\]{}|;:,.<>?]'
            if not re.search(special_pattern, password):
                errors.append("密码必须包含特殊字符")
        
        # 连续字符检查
        consecutive_count = 1
        for i in range(1, len(password)):
            if password[i] == password[i-1]:
                consecutive_count += 1
                if consecutive_count >= policy['max_consecutive_chars']:
                    errors.append(f"密码不能包含超过{policy['max_consecutive_chars']}个连续相同字符")
                    break
            else:
                consecutive_count = 1
        
        # 用户信息相似性检查
        if user:
            user_info = [
                user.username.lower() if user.username else '',
                user.first_name.lower() if user.first_name else '',
                user.last_name.lower() if user.last_name else '',
                user.email.lower() if user.email else ''
            ]
            
            password_lower = password.lower()
            for info in user_info:
                if info and info in password_lower:
                    errors.append("密码不能包含您的个人信息")
        
        # 常见密码检查
        common_patterns = [
            r'(.)\1{2,}',  # 三个或更多相同字符
            r'123456|abcdef',  # 常见序列
            r'(?:012|123|234|345|456|567|678|789|890)',  # 连续数字
        ]
        
        for pattern in common_patterns:
            if re.search(pattern, password_lower):
                errors.append("密码包含常见的弱密码模式")
        
        if errors:
            raise ValidationError(errors)
        
        return True

# 使用示例
def password_security_example():
    """密码安全示例"""
    security = PasswordSecurity()
    policy_manager = PasswordPolicyManager()
    
    # 生成安全密码
    secure_password = security.generate_secure_password()
    print(f"生成的安全密码: {secure_password}")
    
    # 检查密码强度
    strength_result = security.check_password_strength(secure_password)
    print(f"密码强度: {strength_result}")
    
    # 验证密码合规性
    try:
        policy_manager.validate_password_compliance(secure_password)
        print("密码符合策略要求")
    except ValidationError as e:
        print(f"密码不符合策略: {e}")
    
    # 哈希密码
    hashed = security.hash_password_with_salt(secure_password)
    print(f"哈希后的密码: {hashed[:20]}...")
    
    # 验证密码
    is_valid = security.verify_password_hash(secure_password, hashed)
    print(f"密码验证结果: {is_valid}")

    return secure_password

会话安全

# 会话安全实践
import secrets
import hashlib
import hmac
from django.contrib.sessions.models import Session
from django.utils import timezone
from datetime import timedelta

class SessionSecurity:
    """会话安全"""
    
    @staticmethod
    def generate_secure_session_key():
        """生成安全的会话密钥"""
        return secrets.token_urlsafe(32)
    
    @staticmethod
    def create_session_fingerprint(request):
        """创建会话指纹"""
        user_agent = request.META.get('HTTP_USER_AGENT', '')
        ip_address = request.META.get('REMOTE_ADDR', '')
        accept_language = request.META.get('HTTP_ACCEPT_LANGUAGE', '')
        accept_encoding = request.META.get('HTTP_ACCEPT_ENCODING', '')
        
        fingerprint_data = f"{user_agent}|{ip_address}|{accept_language}|{accept_encoding}"
        return hashlib.sha256(fingerprint_data.encode()).hexdigest()
    
    @staticmethod
    def validate_session_fingerprint(request, stored_fingerprint):
        """验证会话指纹"""
        current_fingerprint = SessionSecurity.create_session_fingerprint(request)
        return hmac.compare_digest(current_fingerprint, stored_fingerprint)
    
    @staticmethod
    def implement_session_timeout(session, timeout_minutes=30):
        """实现会话超时"""
        session.set_expiry(timeout_minutes * 60)  # 转换为秒
    
    @staticmethod
    def rotate_session_key(request):
        """轮换会话密钥(防会话固定攻击)"""
        request.session.cycle_key()
    
    @staticmethod
    def validate_session_ip_binding(session, current_ip):
        """验证会话IP绑定"""
        stored_ip = session.get('original_ip')
        if stored_ip:
            return stored_ip == current_ip
        return True  # 首次访问,设置IP绑定
    
    @staticmethod
    def validate_session_ua_binding(session, current_ua):
        """验证会话User-Agent绑定"""
        stored_ua = session.get('original_user_agent')
        if stored_ua:
            return stored_ua == current_ua
        return True  # 首次访问,设置UA绑定

# 会话安全中间件
class SecureSessionMiddleware:
    """安全会话中间件"""
    
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        response = self.get_response(request)
        
        if hasattr(request, 'session') and request.session.session_key:
            # 验证会话安全
            if not self.validate_session_security(request):
                # 会话不安全,清除会话
                request.session.flush()
                response = self.handle_unsafe_session(request)
        
        return response
    
    def validate_session_security(self, request):
        """验证会话安全性"""
        if not request.session.session_key:
            return True
        
        # 验证IP绑定
        current_ip = request.META.get('REMOTE_ADDR')
        if not SessionSecurity.validate_session_ip_binding(request.session, current_ip):
            return False
        
        # 验证User-Agent绑定
        current_ua = request.META.get('HTTP_USER_AGENT', '')
        if not SessionSecurity.validate_session_ua_binding(request.session, current_ua):
            return False
        
        # 验证会话指纹
        if 'session_fingerprint' in request.session:
            if not SessionSecurity.validate_session_fingerprint(
                request, request.session['session_fingerprint']
            ):
                return False
        
        return True
    
    def handle_unsafe_session(self, request):
        """处理不安全会话"""
        from django.http import HttpResponse
        return HttpResponse(
            "会话安全验证失败", 
            status=403
        )

# 会话监控和限制
class SessionMonitor:
    """会话监控"""
    
    @staticmethod
    def limit_sessions_per_user(user, max_sessions=5):
        """限制用户会话数量"""
        active_sessions = Session.objects.filter(
            session_data__contains=str(user.id),
            expire_date__gt=timezone.now()
        )
        
        if active_sessions.count() > max_sessions:
            # 终止最早的会话
            sessions_to_delete = active_sessions.order_by('expire_date')[
                max_sessions:
            ]
            sessions_to_delete.delete()
    
    @staticmethod
    def detect_multiple_device_login(user, ip_address, max_devices=3):
        """检测多设备登录"""
        # 获取用户最近的登录IP
        recent_logins = UserLoginLog.objects.filter(
            user=user,
            login_time__gte=timezone.now() - timezone.timedelta(hours=24)
        ).values('ip_address').distinct()
        
        unique_ips = set(login.ip_address for login in recent_logins)
        
        if ip_address not in unique_ips and len(unique_ips) >= max_devices:
            # 可能的异常登录
            return True, f"检测到在第{len(unique_ips)+1}个设备上登录"
        
        return False, ""

# CSRF保护增强
from django.middleware.csrf import CsrfViewMiddleware

class EnhancedCsrfMiddleware(CsrfViewMiddleware):
    """增强CSRF保护中间件"""
    
    def process_view(self, request, callback, callback_args, callback_kwargs):
        # 额外的CSRF检查
        if self._is_post_request(request):
            # 检查Referer头
            referer = request.META.get('HTTP_REFERER')
            if referer:
                # 验证Referer是否来自可信域
                if not self._is_valid_referer(referer, request):
                    return self._reject(request, "CSRF check failed: Invalid referer")
            
            # 检查Origin头
            origin = request.META.get('HTTP_ORIGIN')
            if origin:
                if not self._is_valid_origin(origin, request):
                    return self._reject(request, "CSRF check failed: Invalid origin")
        
        return super().process_view(request, callback, callback_args, callback_kwargs)
    
    def _is_post_request(self, request):
        return request.method in ('POST', 'PUT', 'PATCH', 'DELETE')
    
    def _is_valid_referer(self, referer, request):
        # 实现Referer验证逻辑
        from urllib.parse import urlparse
        referer_parsed = urlparse(referer)
        host_parsed = urlparse(request.build_absolute_uri())
        
        # 检查主机名是否匹配
        return referer_parsed.hostname == host_parsed.hostname
    
    def _is_valid_origin(self, origin, request):
        # 实现Origin验证逻辑
        from urllib.parse import urlparse
        origin_parsed = urlparse(origin)
        host_parsed = urlparse(request.build_absolute_uri())
        
        return origin_parsed.hostname == host_parsed.hostname

# 使用示例
def session_security_example(request):
    """会话安全示例"""
    # 创建安全会话
    session_security = SessionSecurity()
    
    # 在用户登录时
    if request.user.is_authenticated:
        # 轮换会话密钥(防会话固定)
        session_security.rotate_session_key(request)
        
        # 设置会话指纹
        fingerprint = session_security.create_session_fingerprint(request)
        request.session['session_fingerprint'] = fingerprint
        
        # 设置IP和UA绑定
        request.session['original_ip'] = request.META.get('REMOTE_ADDR')
        request.session['original_user_agent'] = request.META.get('HTTP_USER_AGENT', '')
        
        # 设置会话超时
        session_security.implement_session_timeout(request.session, 30)
        
        # 限制会话数量
        SessionMonitor.limit_sessions_per_user(request.user, max_sessions=3)
    
    # 检测多设备登录
    is_suspicious, message = SessionMonitor.detect_multiple_device_login(
        request.user, 
        request.META.get('REMOTE_ADDR'),
        max_devices=2
    )
    
    if is_suspicious:
        print(f"安全警告: {message}")
    
    return request.session

账户安全

# 账户安全实践
import time
import ipaddress
from django.core.cache import cache
from django.contrib.auth.models import User

class AccountSecurity:
    """账户安全"""
    
    def __init__(self):
        self.brute_force_protection = BruteForceProtection()
        self.account_lockout = AccountLockoutSystem()
        self.two_factor_auth = TwoFactorAuthentication()
    
    def secure_login_attempt(self, username, password, request):
        """安全登录尝试"""
        ip_address = request.META.get('REMOTE_ADDR')
        user_agent = request.META.get('HTTP_USER_AGENT', '')
        
        # 检查暴力破解
        if self.brute_force_protection.is_blocked(username, ip_address):
            return {
                'success': False,
                'error': '账户或IP已被暂时封禁',
                'blocked': True
            }
        
        # 尝试认证
        user = authenticate(request, username=username, password=password)
        
        if user:
            # 登录成功
            self.brute_force_protection.record_successful_login(username, ip_address)
            
            # 检查账户状态
            if not user.is_active:
                return {
                    'success': False,
                    'error': '账户已被禁用'
                }
            
            # 检查账户锁定
            if self.account_lockout.is_locked(user):
                return {
                    'success': False,
                    'error': '账户已被锁定'
                }
            
            # 检查是否需要二次验证
            if self.two_factor_auth.is_required(user):
                # 这里应该重定向到二次验证页面
                request.session['pending_2fa_user_id'] = user.id
                return {
                    'success': False,
                    'requires_2fa': True,
                    'user_id': user.id
                }
            
            return {
                'success': True,
                'user': user
            }
        else:
            # 登录失败,记录尝试
            self.brute_force_protection.record_failed_attempt(
                username, ip_address, user_agent
            )
            
            # 检查是否触发锁定
            if self.brute_force_protection.should_lock_account(username, ip_address):
                self.account_lockout.lock_account(
                    username, 
                    reason='多次登录失败'
                )
            
            return {
                'success': False,
                'error': '用户名或密码错误'
            }

# 暴力破解防护
class BruteForceProtection:
    """暴力破解防护"""
    
    def __init__(self):
        self.max_attempts_per_ip = getattr(settings, 'MAX_LOGIN_ATTEMPTS_PER_IP', 10)
        self.max_attempts_per_user = getattr(settings, 'MAX_LOGIN_ATTEMPTS_PER_USER', 5)
        self.block_duration = getattr(settings, 'BLOCK_DURATION_SECONDS', 900)  # 15分钟
        self.time_window = getattr(settings, 'ATTEMPT_TIME_WINDOW', 900)  # 15分钟
        
    def get_cache_keys(self, username, ip_address):
        """获取缓存键"""
        return {
            'ip_attempts': f"login_attempts_ip:{ip_address}",
            'user_attempts': f"login_attempts_user:{username}",
            'ip_block': f"login_block_ip:{ip_address}",
            'user_block': f"login_block_user:{username}"
        }
    
    def is_blocked(self, username, ip_address):
        """检查是否被阻止"""
        keys = self.get_cache_keys(username, ip_address)
        
        # 检查IP是否被阻止
        if cache.get(keys['ip_block']):
            return True
        
        # 检查用户是否被阻止
        if cache.get(keys['user_block']):
            return True
        
        return False
    
    def record_failed_attempt(self, username, ip_address, user_agent):
        """记录失败尝试"""
        keys = self.get_cache_keys(username, ip_address)
        
        # 增加IP尝试次数
        current_ip_attempts = cache.get(keys['ip_attempts'], 0)
        cache.set(keys['ip_attempts'], current_ip_attempts + 1, timeout=self.time_window)
        
        # 增加用户尝试次数
        current_user_attempts = cache.get(keys['user_attempts'], 0)
        cache.set(keys['user_attempts'], current_user_attempts + 1, timeout=self.time_window)
        
        # 记录失败尝试日志
        UserFailedLoginAttempt.objects.create(
            username=username,
            ip_address=ip_address,
            user_agent=user_agent
        )
    
    def record_successful_login(self, username, ip_address):
        """记录成功登录"""
        keys = self.get_cache_keys(username, ip_address)
        
        # 清除相关的尝试计数
        cache.delete(keys['ip_attempts'])
        cache.delete(keys['user_attempts'])
        
        # 清除阻止状态
        cache.delete(keys['ip_block'])
        cache.delete(keys['user_block'])
    
    def should_lock_account(self, username, ip_address):
        """检查是否应该锁定账户"""
        keys = self.get_cache_keys(username, ip_address)
        
        # 检查IP尝试次数
        ip_attempts = cache.get(keys['ip_attempts'], 0)
        if ip_attempts >= self.max_attempts_per_ip:
            cache.set(keys['ip_block'], True, timeout=self.block_duration)
            return True
        
        # 检查用户尝试次数
        user_attempts = cache.get(keys['user_attempts'], 0)
        if user_attempts >= self.max_attempts_per_user:
            cache.set(keys['user_block'], True, timeout=self.block_duration)
            return True
        
        return False

# 账户锁定系统
class AccountLockoutSystem:
    """账户锁定系统"""
    
    def __init__(self):
        self.lock_duration = getattr(settings, 'ACCOUNT_LOCKOUT_DURATION', 1800)  # 30分钟
    
    def lock_account(self, username, reason='', locked_by=None):
        """锁定账户"""
        try:
            user = User.objects.get(username=username)
            user.is_active = False
            user.save()
            
            # 记录锁定日志
            AccountLockLog.objects.create(
                user=user,
                reason=reason,
                locked_by=locked_by,
                lock_type='automatic'  # automatic, manual
            )
            
            return True
        except User.DoesNotExist:
            return False
    
    def unlock_account(self, username, unlocked_by=None):
        """解锁账户"""
        try:
            user = User.objects.get(username=username)
            user.is_active = True
            user.save()
            
            # 记录解锁日志
            AccountUnlockLog.objects.create(
                user=user,
                unlocked_by=unlocked_by
            )
            
            return True
        except User.DoesNotExist:
            return False
    
    def is_locked(self, user):
        """检查账户是否被锁定"""
        if not user.is_active:
            # 检查是否有解锁条件
            # 例如:检查锁定时间是否已过期
            last_lock = AccountLockLog.objects.filter(
                user=user
            ).order_by('-timestamp').first()
            
            if last_lock:
                from datetime import timedelta
                if timezone.now() - last_lock.timestamp > timedelta(seconds=self.lock_duration):
                    # 自动解锁
                    self.unlock_account(user.username)
                    return False
        
        return not user.is_active

# 账户锁定日志
class AccountLockLog(models.Model):
    """账户锁定日志"""
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    reason = models.TextField()
    locked_by = models.ForeignKey(
        User, 
        on_delete=models.SET_NULL, 
        null=True, 
        related_name='locked_accounts'
    )
    lock_type = models.CharField(max_length=20, choices=[
        ('automatic', '自动'),
        ('manual', '手动')
    ])
    timestamp = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        db_table = 'account_lock_log'

class AccountUnlockLog(models.Model):
    """账户解锁日志"""
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    unlocked_by = models.ForeignKey(
        User, 
        on_delete=models.SET_NULL, 
        null=True, 
        related_name='unlocked_accounts'
    )
    timestamp = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        db_table = 'account_unlock_log'

# 双因素认证
class TwoFactorAuthentication:
    """双因素认证"""
    
    def __init__(self):
        self.totp_service = TOTPService()
        self.sms_service = SMSService()  # 假设有一个SMS服务类
    
    def is_required(self, user):
        """检查是否需要二次验证"""
        if hasattr(user, 'profile') and user.profile.enable_2fa:
            return True
        return False
    
    def generate_2fa_setup_info(self, user, method='totp'):
        """生成二次验证设置信息"""
        if method == 'totp':
            secret = self.totp_service.generate_secret()
            qr_code, uri = self.totp_service.generate_qr_code(user)
            
            # 临时存储secret,等待用户验证
            cache_key = f"2fa_setup_{user.id}"
            cache.set(cache_key, secret, timeout=300)  # 5分钟过期
            
            return {
                'method': 'totp',
                'secret': secret,
                'qr_code': qr_code,
                'uri': uri
            }
        elif method == 'sms':
            # SMS设置逻辑
            pass
    
    def verify_2fa_setup(self, user, token):
        """验证二次验证设置"""
        cache_key = f"2fa_setup_{user.id}"
        stored_secret = cache.get(cache_key)
        
        if stored_secret:
            is_valid = self.totp_service.verify_code(stored_secret, token)
            if is_valid:
                # 保存到用户配置文件
                if hasattr(user, 'profile'):
                    user.profile.totp_secret = stored_secret
                    user.profile.enable_2fa = True
                    user.profile.save()
                
                cache.delete(cache_key)
                return True
        
        return False
    
    def verify_2fa_login(self, user_id, token):
        """验证登录时的二次验证"""
        try:
            user = User.objects.get(id=user_id)
            if hasattr(user, 'profile') and user.profile.totp_secret:
                return self.totp_service.verify_code(user.profile.totp_secret, token)
        except User.DoesNotExist:
            pass
        
        return False
    
    def disable_2fa(self, user):
        """禁用二次验证"""
        if hasattr(user, 'profile'):
            user.profile.enable_2fa = False
            user.profile.totp_secret = ''
            user.profile.save()

# SMS服务类(示例)
class SMSService:
    """短信服务"""
    
    def send_verification_code(self, phone_number):
        """发送验证码"""
        import random
        code = f"{random.randint(100000, 999999)}"
        
        # 这里应该是实际的短信发送逻辑
        # 例如:调用短信网关API
        
        # 存储验证码(实际应用中应该加密存储)
        cache_key = f"sms_code_{phone_number}"
        cache.set(cache_key, code, timeout=300)  # 5分钟过期
        
        return code
    
    def verify_code(self, phone_number, code):
        """验证短信验证码"""
        cache_key = f"sms_code_{phone_number}"
        stored_code = cache.get(cache_key)
        
        if stored_code and stored_code == code:
            cache.delete(cache_key)  # 验证成功后删除
            return True
        
        return False

# 账户安全监控
class AccountSecurityMonitor:
    """账户安全监控"""
    
    @staticmethod
    def detect_suspicious_activity(user, request):
        """检测可疑活动"""
        events = []
        
        # 检查IP地理位置变化
        current_ip = request.META.get('REMOTE_ADDR')
        if hasattr(user, 'profile') and user.profile.last_login_ip:
            if not AccountSecurityMonitor.is_same_location(
                user.profile.last_login_ip, 
                current_ip
            ):
                events.append({
                    'type': 'location_change',
                    'message': f'登录位置发生变化: {current_ip}',
                    'severity': 'high'
                })
        
        # 检查登录时间模式
        recent_logins = UserLoginLog.objects.filter(
            user=user,
            login_time__gte=timezone.now() - timezone.timedelta(days=7)
        ).order_by('-login_time')
        
        if recent_logins.count() > 10:
            events.append({
                'type': 'high_frequency_login',
                'message': '短时间内登录频繁',
                'severity': 'medium'
            })
        
        # 检查设备指纹变化
        current_ua = request.META.get('HTTP_USER_AGENT', '')
        if hasattr(user, 'profile') and user.profile.last_user_agent:
            if not AccountSecurityMonitor.is_similar_ua(
                user.profile.last_user_agent,
                current_ua
            ):
                events.append({
                    'type': 'device_change',
                    'message': '登录设备发生变化',
                    'severity': 'low'
                })
        
        return events
    
    @staticmethod
    def is_same_location(ip1, ip2):
        """检查IP是否在同一地理位置(简化版)"""
        # 实际应用中应该使用IP地理位置服务
        # 这里只是简单比较IP段
        try:
            ip1_obj = ipaddress.IPv4Address(ip1)
            ip2_obj = ipaddress.IPv4Address(ip2)
            # 比较前两个八位字节
            return str(ip1_obj).split('.')[:2] == str(ip2_obj).split('.')[:2]
        except:
            return ip1.split('.')[:2] == ip2.split('.')[:2]
    
    @staticmethod
    def is_similar_ua(ua1, ua2):
        """检查User-Agent是否相似"""
        # 简单比较主要特征
        import re
        browser1 = re.search(r'(Chrome|Firefox|Safari|Edge)', ua1, re.I)
        browser2 = re.search(r'(Chrome|Firefox|Safari|Edge)', ua2, re.I)
        
        if browser1 and browser2:
            return browser1.group().lower() == browser2.group().lower()
        
        return ua1[:50] == ua2[:50]  # 比较前50个字符

# 使用示例
def account_security_example(request):
    """账户安全示例"""
    security = AccountSecurity()
    
    if request.method == 'POST':
        username = request.POST.get('username')
        password = request.POST.get('password')
        
        result = security.secure_login_attempt(username, password, request)
        
        if result['success']:
            # 登录成功
            login(request, result['user'])
            
            # 更新最后登录信息
            if hasattr(result['user'], 'profile'):
                result['user'].profile.last_login_ip = request.META.get('REMOTE_ADDR')
                result['user'].profile.last_user_agent = request.META.get('HTTP_USER_AGENT', '')
                result['user'].profile.save()
            
            return redirect('dashboard')
        elif result.get('requires_2fa'):
            # 需要二次验证
            return redirect('verify_2fa', user_id=result['user_id'])
        else:
            # 登录失败
            messages.error(request, result['error'])
    
    return render(request, 'login.html')

# 账户安全配置
"""
# settings.py 安全配置示例
SECURE_LOGIN_CONFIG = {
    'MAX_LOGIN_ATTEMPTS_PER_IP': 10,
    'MAX_LOGIN_ATTEMPTS_PER_USER': 5,
    'BLOCK_DURATION_SECONDS': 900,  # 15分钟
    'ATTEMPT_TIME_WINDOW': 900,     # 15分钟
    'ACCOUNT_LOCKOUT_DURATION': 1800,  # 30分钟
    'PASSWORD_MIN_LENGTH': 12,
    'PASSWORD_REQUIRE_UPPERCASE': True,
    'PASSWORD_REQUIRE_LOWERCASE': True,
    'PASSWORD_REQUIRE_NUMBERS': True,
    'PASSWORD_REQUIRE_SPECIAL': True,
    'PASSWORD_HISTORY_CHECK_COUNT': 5,
    'PASSWORD_EXPIRY_DAYS': 90,
}
"""

第三方认证集成

OAuth2集成

# OAuth2认证集成
import requests
import json
from urllib.parse import urlencode, parse_qs
from django.contrib.auth import login
from django.shortcuts import redirect
from django.conf import settings

class OAuth2Provider:
    """OAuth2提供者基类"""
    
    def __init__(self, client_id, client_secret, redirect_uri, **kwargs):
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = redirect_uri
        self.authorization_url = kwargs.get('authorization_url')
        self.token_url = kwargs.get('token_url')
        self.user_info_url = kwargs.get('user_info_url')
        self.scope = kwargs.get('scope', [])
    
    def get_authorization_url(self, state=None):
        """获取授权URL"""
        params = {
            'client_id': self.client_id,
            'redirect_uri': self.redirect_uri,
            'response_type': 'code',
            'scope': ' '.join(self.scope),
        }
        if state:
            params['state'] = state
        
        return f"{self.authorization_url}?{urlencode(params)}"
    
    def get_access_token(self, code):
        """获取访问令牌"""
        data = {
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'code': code,
            'grant_type': 'authorization_code',
            'redirect_uri': self.redirect_uri,
        }
        
        response = requests.post(self.token_url, data=data)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取访问令牌失败: {response.text}")
    
    def get_user_info(self, access_token):
        """获取用户信息"""
        headers = {'Authorization': f'Bearer {access_token}'}
        response = requests.get(self.user_info_url, headers=headers)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取用户信息失败: {response.text}")

# GitHub OAuth2集成
class GitHubOAuth2(OAuth2Provider):
    """GitHub OAuth2集成"""
    
    def __init__(self):
        super().__init__(
            client_id=settings.GITHUB_CLIENT_ID,
            client_secret=settings.GITHUB_CLIENT_SECRET,
            redirect_uri=settings.GITHUB_REDIRECT_URI,
            authorization_url='https://github.com/login/oauth/authorize',
            token_url='https://github.com/login/oauth/access_token',
            user_info_url='https://api.github.com/user',
            scope=['user:email']
        )
    
    def get_user_info(self, access_token):
        """获取GitHub用户信息"""
        user_data = super().get_user_info(access_token)
        
        # 获取邮箱信息
        emails_url = 'https://api.github.com/user/emails'
        headers = {'Authorization': f'Bearer {access_token}'}
        emails_response = requests.get(emails_url, headers=headers)
        
        if emails_response.status_code == 200:
            emails = emails_response.json()
            primary_email = next(
                (email['email'] for email in emails if email['primary']), 
                user_data.get('email', '')
            )
        else:
            primary_email = user_data.get('email', '')
        
        return {
            'id': user_data['id'],
            'username': user_data['login'],
            'email': primary_email,
            'name': user_data.get('name', ''),
            'avatar': user_data.get('avatar_url', ''),
            'bio': user_data.get('bio', ''),
            'company': user_data.get('company', ''),
            'location': user_data.get('location', ''),
        }

# Google OAuth2集成
class GoogleOAuth2(OAuth2Provider):
    """Google OAuth2集成"""
    
    def __init__(self):
        super().__init__(
            client_id=settings.GOOGLE_CLIENT_ID,
            client_secret=settings.GOOGLE_CLIENT_SECRET,
            redirect_uri=settings.GOOGLE_REDIRECT_URI,
            authorization_url='https://accounts.google.com/o/oauth2/auth',
            token_url='https://oauth2.googleapis.com/token',
            user_info_url='https://www.googleapis.com/oauth2/v2/userinfo',
            scope=['https://www.googleapis.com/auth/userinfo.email', 
                   'https://www.googleapis.com/auth/userinfo.profile']
        )
    
    def get_user_info(self, access_token):
        """获取Google用户信息"""
        user_data = super().get_user_info(access_token)
        
        return {
            'id': user_data['id'],
            'email': user_data['email'],
            'verified_email': user_data.get('verified_email', False),
            'name': user_data.get('name', ''),
            'given_name': user_data.get('given_name', ''),
            'family_name': user_data.get('family_name', ''),
            'picture': user_data.get('picture', ''),
            'locale': user_data.get('locale', 'en'),
        }

# 微信OAuth2集成
class WeChatOAuth2(OAuth2Provider):
    """微信OAuth2集成"""
    
    def __init__(self):
        super().__init__(
            client_id=settings.WECHAT_APP_ID,
            client_secret=settings.WECHAT_APP_SECRET,
            redirect_uri=settings.WECHAT_REDIRECT_URI,
            authorization_url='https://open.weixin.qq.com/connect/qrconnect',
            token_url='https://api.weixin.qq.com/sns/oauth2/access_token',
            user_info_url='https://api.weixin.qq.com/sns/userinfo',
            scope=['snsapi_login', 'snsapi_userinfo']
        )
    
    def get_authorization_url(self, state=None):
        """获取微信授权URL"""
        params = {
            'appid': self.client_id,
            'redirect_uri': self.redirect_uri,
            'response_type': 'code',
            'scope': ','.join(self.scope),
        }
        if state:
            params['state'] = state
        
        return f"{self.authorization_url}?{urlencode(params)}#wechat_redirect"
    
    def get_access_token(self, code):
        """获取微信访问令牌"""
        params = {
            'appid': self.client_id,
            'secret': self.client_secret,
            'code': code,
            'grant_type': 'authorization_code'
        }
        
        response = requests.get(self.token_url, params=params)
        if response.status_code == 200:
            token_data = response.json()
            if 'errcode' in token_data:
                raise Exception(f"WeChat OAuth error: {token_data}")
            return token_data
        else:
            raise Exception(f"获取访问令牌失败: {response.text}")
    
    def get_user_info(self, access_token, openid):
        """获取微信用户信息"""
        params = {
            'access_token': access_token,
            'openid': openid,
            'lang': 'zh_CN'
        }
        
        response = requests.get(self.user_info_url, params=params)
        if response.status_code == 200:
            user_data = response.json()
            if 'errcode' in user_data:
                raise Exception(f"WeChat user info error: {user_data}")
            return user_data
        else:
            raise Exception(f"获取用户信息失败: {response.text}")

# OAuth2认证管理器
class OAuth2AuthManager:
    """OAuth2认证管理器"""
    
    def __init__(self):
        self.providers = {
            'github': GitHubOAuth2,
            'google': GoogleOAuth2,
            'wechat': WeChatOAuth2,
        }
    
    def get_provider(self, provider_name):
        """获取认证提供者"""
        if provider_name in self.providers:
            return self.providers[provider_name]()
        raise ValueError(f"不支持的OAuth2提供者: {provider_name}")
    
    def initiate_auth(self, request, provider_name, next_url=None):
        """发起认证"""
        provider = self.get_provider(provider_name)
        
        # 生成state参数用于CSRF保护
        import secrets
        state = secrets.token_urlsafe(32)
        
        # 存储state和next_url到session
        request.session[f'oauth2_state_{provider_name}'] = state
        if next_url:
            request.session[f'oauth2_next_{provider_name}'] = next_url
        
        auth_url = provider.get_authorization_url(state=state)
        return redirect(auth_url)
    
    def handle_callback(self, request, provider_name):
        """处理回调"""
        # 验证state参数
        state = request.GET.get('state')
        stored_state = request.session.get(f'oauth2_state_{provider_name}')
        
        if not state or state != stored_state:
            raise ValueError("无效的state参数")
        
        # 获取授权码
        code = request.GET.get('code')
        if not code:
            raise ValueError("未收到授权码")
        
        # 获取访问令牌
        provider = self.get_provider(provider_name)
        token_data = provider.get_access_token(code)
        access_token = token_data['access_token']
        
        # 获取用户信息
        if provider_name == 'wechat':
            openid = token_data['openid']
            user_info = provider.get_user_info(access_token, openid)
        else:
            user_info = provider.get_user_info(access_token)
        
        # 创建或获取用户
        user = self.get_or_create_social_user(provider_name, user_info)
        
        # 登录用户
        login(request, user)
        
        # 清除session数据
        request.session.pop(f'oauth2_state_{provider_name}', None)
        next_url = request.session.pop(f'oauth2_next_{provider_name}', '/')
        
        return redirect(next_url)
    
    def get_or_create_social_user(self, provider_name, user_info):
        """获取或创建社交用户"""
        # 构造外部用户ID
        external_id = f"{provider_name}_{user_info['id']}"
        
        # 尝试获取现有用户
        social_user, created = SocialUser.objects.get_or_create(
            external_id=external_id,
            provider=provider_name,
            defaults={
                'external_data': json.dumps(user_info),
            }
        )
        
        # 获取或创建本地用户
        if created:
            # 新的社交用户,创建本地账户
            local_user = self.create_local_user(provider_name, user_info)
            social_user.local_user = local_user
            social_user.save()
        else:
            # 现有的社交用户
            local_user = social_user.local_user
            # 更新用户信息
            self.update_user_from_social_data(local_user, user_info)
        
        return local_user
    
    def create_local_user(self, provider_name, user_info):
        """创建本地用户"""
        # 生成唯一用户名
        base_username = user_info.get('username') or user_info.get('name') or f"{provider_name}_user"
        username = self.generate_unique_username(base_username)
        
        # 获取邮箱
        email = user_info.get('email', '')
        if email:
            # 验证邮箱唯一性
            if User.objects.filter(email=email).exists():
                email = ''  # 如果邮箱已被使用,则不设置
        
        # 创建用户
        local_user = CustomUser.objects.create_user(
            username=username,
            email=email,
            password=None,  # OAuth用户不需要本地密码
        )
        
        # 设置用户信息
        local_user.first_name = user_info.get('given_name', '')[:30]
        local_user.last_name = user_info.get('family_name', '')[:150]
        if not local_user.first_name and not local_user.last_name:
            name = user_info.get('name', '')
            if name:
                parts = name.split(' ', 1)
                local_user.first_name = parts[0][:30]
                if len(parts) > 1:
                    local_user.last_name = parts[1][:150]
        
        # 如果有头像,可以下载并保存
        avatar_url = user_info.get('avatar') or user_info.get('picture')
        if avatar_url and hasattr(local_user, 'profile'):
            # 这里可以下载头像并保存到用户资料中
            pass
        
        local_user.is_verified = True  # 社交账号通常被认为是验证过的
        local_user.save()
        
        return local_user
    
    def update_user_from_social_data(self, user, user_info):
        """从社交数据更新用户信息"""
        updated = False
        
        # 更新姓名
        first_name = user_info.get('given_name', '')[:30]
        last_name = user_info.get('family_name', '')[:150]
        
        if first_name and user.first_name != first_name:
            user.first_name = first_name
            updated = True
        
        if last_name and user.last_name != last_name:
            user.last_name = last_name
            updated = True
        
        # 如果之前没有姓名,可以从name字段获取
        if not user.first_name and not user.last_name:
            name = user_info.get('name', '')
            if name:
                parts = name.split(' ', 1)
                user.first_name = parts[0][:30]
                if len(parts) > 1:
                    user.last_name = parts[1][:150]
                updated = True
        
        # 更新邮箱(如果用户没有邮箱且社交账号提供了验证过的邮箱)
        if not user.email and user_info.get('email') and user_info.get('verified_email', True):
            user.email = user_info['email']
            updated = True
        
        if updated:
            user.save()
    
    def generate_unique_username(self, base_username):
        """生成唯一用户名"""
        username = base_username
        counter = 1
        
        while CustomUser.objects.filter(username=username).exists():
            username = f"{base_username}{counter}"
            counter += 1
        
        return username

# 社交用户模型
class SocialUser(models.Model):
    """社交用户"""
    PROVIDER_CHOICES = [
        ('github', 'GitHub'),
        ('google', 'Google'),
        ('wechat', 'WeChat'),
        ('facebook', 'Facebook'),
        ('twitter', 'Twitter'),
    ]
    
    local_user = models.ForeignKey(
        CustomUser, 
        on_delete=models.CASCADE, 
        related_name='social_accounts'
    )
    provider = models.CharField(max_length=20, choices=PROVIDER_CHOICES)
    external_id = models.CharField(max_length=100)
    external_data = models.TextField()  # JSON格式存储外部用户数据
    access_token = models.TextField(blank=True)
    refresh_token = models.TextField(blank=True)
    token_expires_at = models.DateTimeField(null=True, blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    
    class Meta:
        db_table = 'social_user'
        unique_together = ['provider', 'external_id']
    
    def get_external_data(self):
        """获取外部数据"""
        import json
        try:
            return json.loads(self.external_data)
        except json.JSONDecodeError:
            return {}

# OAuth2视图示例
def oauth_login(request, provider):
    """OAuth2登录视图"""
    manager = OAuth2AuthManager()
    next_url = request.GET.get('next', '/')
    return manager.initiate_auth(request, provider, next_url)

def oauth_callback(request, provider):
    """OAuth2回调视图"""
    manager = OAuth2AuthManager()
    try:
        return manager.handle_callback(request, provider)
    except Exception as e:
        messages.error(request, f"认证失败: {str(e)}")
        return redirect('login')

# OAuth2 URLs配置示例
"""
# urls.py
from django.urls import path
from . import views

urlpatterns = [
    path('oauth/login/<str:provider>/', views.oauth_login, name='oauth_login'),
    path('oauth/callback/<str:provider>/', views.oauth_callback, name='oauth_callback'),
]
"""

JWT认证集成

# JWT认证集成
import jwt
from django.conf import settings
from django.contrib.auth import get_user_model
from django.utils import timezone
from datetime import datetime, timedelta

User = get_user_model()

class JWTAuthManager:
    """JWT认证管理器"""
    
    def __init__(self):
        self.secret_key = settings.SECRET_KEY
        self.algorithm = 'HS256'
        self.access_token_lifetime = timedelta(
            minutes=getattr(settings, 'JWT_ACCESS_TOKEN_LIFETIME', 15)
        )
        self.refresh_token_lifetime = timedelta(
            days=getattr(settings, 'JWT_REFRESH_TOKEN_LIFETIME', 7)
        )
    
    def generate_tokens(self, user):
        """生成访问令牌和刷新令牌"""
        access_token = self._generate_access_token(user)
        refresh_token = self._generate_refresh_token(user)
        
        return {
            'access': access_token,
            'refresh': refresh_token,
            'access_expires_in': self.access_token_lifetime.total_seconds(),
            'refresh_expires_in': self.refresh_token_lifetime.total_seconds(),
        }
    
    def _generate_access_token(self, user):
        """生成访问令牌"""
        payload = {
            'user_id': user.id,
            'username': user.username,
            'email': user.email,
            'exp': timezone.now() + self.access_token_lifetime,
            'iat': timezone.now(),
            'token_type': 'access'
        }
        
        return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
    
    def _generate_refresh_token(self, user):
        """生成刷新令牌"""
        payload = {
            'user_id': user.id,
            'exp': timezone.now() + self.refresh_token_lifetime,
            'iat': timezone.now(),
            'token_type': 'refresh'
        }
        
        return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
    
    def verify_access_token(self, token):
        """验证访问令牌"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            
            if payload['token_type'] != 'access':
                raise jwt.InvalidTokenError("Invalid token type")
            
            user_id = payload['user_id']
            user = User.objects.get(id=user_id)
            
            return user, True
        except jwt.ExpiredSignatureError:
            return None, False
        except jwt.InvalidTokenError:
            return None, False
        except User.DoesNotExist:
            return None, False
    
    def verify_refresh_token(self, token):
        """验证刷新令牌"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            
            if payload['token_type'] != 'refresh':
                raise jwt.InvalidTokenError("Invalid token type")
            
            user_id = payload['user_id']
            user = User.objects.get(id=user_id)
            
            return user, True
        except jwt.ExpiredSignatureError:
            return None, False
        except jwt.InvalidTokenError:
            return None, False
        except User.DoesNotExist:
            return None, False
    
    def refresh_access_token(self, refresh_token):
        """使用刷新令牌获取新的访问令牌"""
        user, valid = self.verify_refresh_token(refresh_token)
        if valid and user:
            return self._generate_access_token(user)
        return None

# JWT认证后端
class JWTAuthenticationBackend:
    """JWT认证后端"""
    
    def __init__(self):
        self.jwt_manager = JWTAuthManager()
    
    def authenticate(self, request):
        """从请求中认证用户"""
        token = self.get_token_from_request(request)
        if token:
            user, valid = self.jwt_manager.verify_access_token(token)
            if valid and user:
                return user
        return None
    
    def get_token_from_request(self, request):
        """从请求中获取JWT令牌"""
        # 首先检查Authorization头
        auth_header = request.META.get('HTTP_AUTHORIZATION')
        if auth_header and auth_header.startswith('Bearer '):
            return auth_header.split(' ')[1]
        
        # 然后检查URL参数
        token = request.GET.get('token')
        if token:
            return token
        
        # 最后检查Cookie
        token = request.COOKIES.get('jwt_token')
        if token:
            return token
        
        return None
    
    def get_user(self, user_id):
        """获取用户"""
        try:
            return User.objects.get(pk=user_id)
        except User.DoesNotExist:
            return None

# JWT中间件
class JWTMiddleware:
    """JWT中间件"""
    
    def __init__(self, get_response):
        self.get_response = get_response
        self.auth_backend = JWTAuthenticationBackend()

    def __call__(self, request):
        # 尝试JWT认证
        user = self.auth_backend.authenticate(request)
        if user:
            request.user = user
        
        response = self.get_response(request)
        return response

# JWT视图示例
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.views import View
import json

@method_decorator(csrf_exempt, name='dispatch')
class TokenObtainPairView(View):
    """获取令牌对视图"""
    
    def post(self, request):
        try:
            data = json.loads(request.body)
            username = data.get('username')
            password = data.get('password')
            
            if not username or not password:
                return JsonResponse({
                    'error': '用户名和密码都是必需的'
                }, status=400)
            
            user = authenticate(request, username=username, password=password)
            if user and user.is_active:
                jwt_manager = JWTAuthManager()
                tokens = jwt_manager.generate_tokens(user)
                
                response = JsonResponse({
                    'tokens': tokens
                })
                
                # 可选:将JWT存储在HttpOnly cookie中
                response.set_cookie(
                    'jwt_token',
                    tokens['access'],
                    httponly=True,
                    secure=settings.DEBUG == False,  # 生产环境使用HTTPS
                    samesite='Lax'
                )
                
                return response
            else:
                return JsonResponse({
                    'error': '用户名或密码错误'
                }, status=401)
        
        except json.JSONDecodeError:
            return JsonResponse({
                'error': '无效的JSON数据'
            }, status=400)
        except Exception as e:
            return JsonResponse({
                'error': str(e)
            }, status=500)

@method_decorator(csrf_exempt, name='dispatch')
class TokenRefreshView(View):
    """刷新令牌视图"""
    
    def post(self, request):
        try:
            data = json.loads(request.body)
            refresh_token = data.get('refresh')
            
            if not refresh_token:
                return JsonResponse({
                    'error': '刷新令牌是必需的'
                }, status=400)
            
            jwt_manager = JWTAuthManager()
            new_access_token = jwt_manager.refresh_access_token(refresh_token)
            
            if new_access_token:
                response = JsonResponse({
                    'access': new_access_token
                })
                
                # 更新cookie中的访问令牌
                response.set_cookie(
                    'jwt_token',
                    new_access_token,
                    httponly=True,
                    secure=settings.DEBUG == False,
                    samesite='Lax'
                )
                
                return response
            else:
                return JsonResponse({
                    'error': '无效或过期的刷新令牌'
                }, status=401)
        
        except json.JSONDecodeError:
            return JsonResponse({
                'error': '无效的JSON数据'
            }, status=400)
        except Exception as e:
            return JsonResponse({
                'error': str(e)
            }, status=500)

# JWT保护的视图装饰器
def jwt_required(view_func):
    """JWT必需装饰器"""
    @wraps(view_func)
    def wrapper(request, *args, **kwargs):
        jwt_backend = JWTAuthenticationBackend()
        user = jwt_backend.authenticate(request)
        
        if not user:
            return JsonResponse({
                'error': '认证失败'
            }, status=401)
        
        request.user = user
        return view_func(request, *args, **kwargs)
    
    return wrapper

# 使用示例
"""
# URLs配置
urlpatterns = [
    path('api/token/', TokenObtainPairView.as_view(), name='token_obtain_pair'),
    path('api/token/refresh/', TokenRefreshView.as_view(), name='token_refresh'),
    path('api/protected/', jwt_required(ProtectedView.as_view()), name='protected_view'),
]

# 在settings.py中添加中间件
MIDDLEWARE = [
    # ... 其他中间件
    'your_app.middleware.JWTMiddleware',
]
"""

常见问题与解决方案

问题1:密码重置功能

# 密码重置功能实现
from django.core.mail import send_mail
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.contrib.sites.shortcuts import get_current_site
from django.urls import reverse
import secrets

class PasswordResetHandler:
    """密码重置处理器"""
    
    @staticmethod
    def generate_reset_token(user):
        """生成重置令牌"""
        # 删除旧的重置令牌
        PasswordResetToken.objects.filter(user=user, is_used=False).delete()
        
        token = secrets.token_urlsafe(32)
        reset_token = PasswordResetToken.objects.create(
            user=user,
            token=token
        )
        return reset_token
    
    @staticmethod
    def send_reset_email(request, user):
        """发送重置邮件"""
        reset_token = PasswordResetHandler.generate_reset_token(user)
        
        # 构建重置链接
        current_site = get_current_site(request)
        reset_link = f"https://{current_site.domain}{reverse('password_reset_confirm', kwargs={'token': reset_token.token})}"
        
        # 渲染邮件模板
        subject = '重置您的密码'
        html_message = render_to_string('registration/password_reset_email.html', {
            'user': user,
            'reset_link': reset_link,
            'site_name': current_site.name,
        })
        plain_message = strip_tags(html_message)
        
        # 发送邮件
        send_mail(
            subject,
            plain_message,
            settings.DEFAULT_FROM_EMAIL,
            [user.email],
            html_message=html_message,
        )
    
    @staticmethod
    def validate_reset_token(token):
        """验证重置令牌"""
        try:
            reset_token = PasswordResetToken.objects.get(token=token)
            
            # 检查令牌是否过期(1小时)
            from django.utils import timezone
            from datetime import timedelta
            
            if reset_token.is_used:
                return None, "令牌已被使用"
            
            if timezone.now() - reset_token.created_at > timedelta(hours=1):
                return None, "令牌已过期"
            
            return reset_token.user, None
            
        except PasswordResetToken.DoesNotExist:
            return None, "无效的重置令牌"
    
    @staticmethod
    def reset_user_password(user, new_password):
        """重置用户密码"""
        # 验证新密码
        password_validator = PasswordPolicyManager()
        password_validator.validate_password_compliance(new_password, user)
        
        # 设置新密码
        password_service = PasswordService()
        password_service.set_password(user, new_password, check_history=True)
        
        # 标记所有重置令牌为已使用
        PasswordResetToken.objects.filter(user=user, is_used=False).update(is_used=True)

# 密码重置视图
def password_reset_request(request):
    """密码重置请求视图"""
    if request.method == 'POST':
        email = request.POST.get('email')
        
        try:
            user = User.objects.get(email=email)
            PasswordResetHandler.send_reset_email(request, user)
            messages.success(request, '密码重置链接已发送到您的邮箱')
            return redirect('login')
        except User.DoesNotExist:
            messages.error(request, '该邮箱未注册')
    
    return render(request, 'registration/password_reset_request.html')

def password_reset_confirm(request, token):
    """密码重置确认视图"""
    user, error = PasswordResetHandler.validate_reset_token(token)
    
    if error:
        messages.error(request, error)
        return redirect('password_reset_request')
    
    if request.method == 'POST':
        new_password = request.POST.get('new_password')
        confirm_password = request.POST.get('confirm_password')
        
        if new_password != confirm_password:
            messages.error(request, '两次输入的密码不一致')
        else:
            try:
                PasswordResetHandler.reset_user_password(user, new_password)
                messages.success(request, '密码重置成功,请使用新密码登录')
                return redirect('login')
            except ValidationError as e:
                for msg in e.messages:
                    messages.error(request, msg)
    
    return render(request, 'registration/password_reset_confirm.html', {'token': token})

问题2:用户注册激活

# 用户注册激活功能
import secrets
from django.core.signing import TimestampSigner
from django.core.mail import send_mail
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.contrib.sites.shortcuts import get_current_site
from django.urls import reverse

class UserActivationHandler:
    """用户激活处理器"""
    
    def __init__(self):
        self.signer = TimestampSigner()
    
    def generate_activation_token(self, user):
        """生成激活令牌"""
        # 签名用户ID和时间戳
        return self.signer.sign(f"{user.id}:{user.email}")
    
    def verify_activation_token(self, token, max_age=60*60*24*7):  # 7天
        """验证激活令牌"""
        try:
            # 解签名令牌
            data = self.signer.unsign(token, max_age=max_age)
            user_id, email = data.split(':', 1)
            
            # 验证用户
            user = User.objects.get(id=user_id, email=email)
            if user.is_active:
                return None, "账户已经激活"
            
            return user, None
        except Exception as e:
            return None, str(e)
    
    def send_activation_email(self, request, user):
        """发送激活邮件"""
        activation_token = self.generate_activation_token(user)
        
        # 构建激活链接
        current_site = get_current_site(request)
        activation_link = f"https://{current_site.domain}{reverse('activate_account', kwargs={'token': activation_token})}"
        
        # 渲染邮件模板
        subject = '激活您的账户'
        html_message = render_to_string('registration/activation_email.html', {
            'user': user,
            'activation_link': activation_link,
            'site_name': current_site.name,
        })
        plain_message = strip_tags(html_message)
        
        # 发送邮件
        send_mail(
            subject,
            plain_message,
            settings.DEFAULT_FROM_EMAIL,
            [user.email],
            html_message=html_message,
        )

# 用户注册视图(带激活)
def register_user(request):
    """用户注册视图"""
    if request.method == 'POST':
        username = request.POST.get('username')
        email = request.POST.get('email')
        password = request.POST.get('password')
        
        try:
            # 验证输入
            validator = UserValidator()
            validator.validate_username(username)
            validator.validate_email(email)
            
            # 检查唯一性
            if User.objects.filter(username=username).exists():
                raise ValidationError('用户名已存在')
            if User.objects.filter(email=email).exists():
                raise ValidationError('邮箱已被注册')
            
            # 创建未激活用户
            user = CustomUser.objects.create_user(
                username=username,
                email=email,
                password=password,
                is_active=False  # 设为未激活
            )
            
            # 发送激活邮件
            activation_handler = UserActivationHandler()
            activation_handler.send_activation_email(request, user)
            
            messages.success(request, '注册成功!请检查邮箱并点击激活链接完成注册')
            return redirect('login')
            
        except ValidationError as e:
            for msg in e.messages:
                messages.error(request, msg)
    
    return render(request, 'registration/register.html')

def activate_account(request, token):
    """账户激活视图"""
    activation_handler = UserActivationHandler()
    user, error = activation_handler.verify_activation_token(token)
    
    if error:
        messages.error(request, f'激活失败: {error}')
        return redirect('register')
    
    # 激活用户
    user.is_active = True
    user.save()
    
    messages.success(request, '账户激活成功!现在您可以登录了')
    return redirect('login')

问题3:记住我功能

# 记住我功能实现
from django.contrib.auth import login
from django.contrib.sessions.models import Session

class RememberMeHandler:
    """记住我功能处理器"""
    
    @staticmethod
    def set_remember_me(request, user, remember_me):
        """设置记住我功能"""
        if remember_me:
            # 设置较长的会话过期时间(例如2周)
            request.session.set_expiry(1209600)  # 2周 = 14*24*60*60 秒
        else:
            # 浏览器关闭时过期
            request.session.set_expiry(0)
    
    @staticmethod
    def create_persistent_session(request, user, device_fingerprint=None):
        """创建持久会话"""
        # 登录用户
        login(request, user)
        
        # 设置会话选项
        RememberMeHandler.set_remember_me(request, user, True)
        
        # 存储设备指纹(可选)
        if device_fingerprint:
            request.session['device_fingerprint'] = device_fingerprint
        
        # 记录持久会话信息
        PersistentSession.objects.create(
            user=user,
            session_key=request.session.session_key,
            device_fingerprint=device_fingerprint,
            ip_address=request.META.get('REMOTE_ADDR'),
            user_agent=request.META.get('HTTP_USER_AGENT', ''),
            expires_at=timezone.now() + timedelta(days=14)  # 2周
        )

# 持久会话模型
class PersistentSession(models.Model):
    """持久会话"""
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    session_key = models.CharField(max_length=40, unique=True)
    device_fingerprint = models.CharField(max_length=255, blank=True)
    ip_address = models.GenericIPAddressField()
    user_agent = models.TextField()
    created_at = models.DateTimeField(auto_now_add=True)
    expires_at = models.DateTimeField()
    is_active = models.BooleanField(default=True)
    
    class Meta:
        db_table = 'persistent_session'
        indexes = [
            models.Index(fields=['user', 'expires_at']),
            models.Index(fields=['session_key']),
        ]
    
    def is_valid(self):
        """检查会话是否有效"""
        return self.is_active and self.expires_at > timezone.now()

# 记住我中间件
class RememberMeMiddleware:
    """记住我中间件"""
    
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        response = self.get_response(request)
        
        # 检查是否需要记住用户
        if (hasattr(request, 'session') and 
            request.session.get('remember_me') and 
            hasattr(request, 'user') and 
            request.user.is_authenticated):
            
            # 延长会话时间
            request.session.set_expiry(1209600)  # 2周
        
        return response

# 更新登录视图以支持记住我
def enhanced_login_view(request):
    """增强登录视图(支持记住我)"""
    if request.method == 'POST':
        username = request.POST.get('username')
        password = request.POST.get('password')
        remember_me = request.POST.get('remember_me') == 'on'
        
        user = authenticate(request, username=username, password=password)
        if user and user.is_active:
            # 检查账户安全
            security = AccountSecurity()
            security_result = security.secure_login_attempt(username, password, request)
            
            if security_result['success']:
                # 登录用户
                login(request, user)
                
                # 设置记住我
                if remember_me:
                    request.session['remember_me'] = True
                    RememberMeHandler.set_remember_me(request, user, True)
                    
                    # 创建持久会话
                    device_fingerprint = SessionSecurity.create_session_fingerprint(request)
                    RememberMeHandler.create_persistent_session(
                        request, user, device_fingerprint
                    )
                else:
                    request.session['remember_me'] = False
                    RememberMeHandler.set_remember_me(request, user, False)
                
                # 记录登录信息
                UserSessionManager.record_login_info(user, request)
                
                # 检查是否有重定向目标
                next_url = request.POST.get('next') or request.GET.get('next', '/')
                return redirect(next_url)
            else:
                messages.error(request, security_result['error'])
        else:
            messages.error(request, '用户名或密码错误')
    
    return render(request, 'registration/login.html')

本章小结

在本章中,我们深入学习了Django用户认证系统:

  1. **