#Django用户认证系统 - 安全登录注册与权限管理
📂 所属阶段:第三部分 — 高级主题
🎯 难度等级:高级
⏰ 预计学习时间:6-8小时
🎒 前置知识:会话管理
#目录
#认证系统基础概念
用户认证系统是Web应用的核心组件,负责验证用户身份并管理用户权限。
#认证与授权
"""
认证(Authentication) vs 授权(Authorization):
认证(Authentication):
- 验证用户身份
- 证明"你是谁"
- 如:用户名密码验证
- Django内置认证系统
授权(Authorization):
- 验证用户权限
- 证明"你能做什么"
- 如:角色权限检查
- Django权限系统
认证流程:
1. 用户提供凭据(用户名/密码)
2. 系统验证凭据有效性
3. 创建用户会话
4. 标记用户为已认证
授权流程:
1. 检查用户认证状态
2. 验证用户权限
3. 允许或拒绝访问
4. 记录访问日志
"""#Django认证系统组成
"""
Django认证系统组成部分:
1. User模型 (django.contrib.auth.models.User)
- 用户基本信息存储
- 密码哈希存储
- 权限字段
2. Authentication Backend (django.contrib.auth.backends)
- 用户验证逻辑
- 支持多种认证方式
3. Permission System
- 用户权限管理
- 内容级权限控制
4. Group System
- 用户分组管理
- 批量权限分配
5. Login/Logout Views
- 内置登录/登出视图
- 会话管理
6. Decorators & Mixins
- 视图保护装饰器
- 类视图权限检查
"""#认证核心组件
# Django认证核心组件
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.models import User, Group, Permission
from django.contrib.auth.backends import ModelBackend
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin
"""
主要功能模块:
1. 用户管理
- 创建、修改、删除用户
- 密码管理
- 激活/禁用用户
2. 权限管理
- 模型级权限
- 对象级权限
- 自定义权限
3. 组管理
- 用户分组
- 权限批量分配
- 角色管理
4. 会话管理
- 登录/登出
- 会话安全
- 多设备登录
"""#Django认证架构
#认证中间件
# 认证中间件配置
from django.contrib.auth.middleware import AuthenticationMiddleware
from django.contrib.sessions.middleware import SessionMiddleware
class CustomAuthenticationMiddleware(AuthenticationMiddleware):
"""自定义认证中间件"""
def process_request(self, request):
"""处理请求 - 认证用户"""
super().process_request(request)
# 额外的认证逻辑
if hasattr(request, 'user') and request.user.is_authenticated:
# 更新最后活动时间
self.update_last_activity(request.user)
# 检查账户状态
if not self.is_account_valid(request.user):
# 账户无效,强制登出
logout(request)
def update_last_activity(self, user):
"""更新用户最后活动时间"""
from django.utils import timezone
user.last_login = timezone.now()
user.save(update_fields=['last_login'])
def is_account_valid(self, user):
"""检查账户是否有效"""
# 检查账户是否被禁用
if not user.is_active:
return False
# 检查账户是否过期
if hasattr(user, 'expiry_date') and user.expiry_date:
from django.utils import timezone
if user.expiry_date < timezone.now():
return False
return True
# 认证后端接口
from django.contrib.auth.backends import BaseBackend
class CustomAuthBackend(BaseBackend):
"""自定义认证后端"""
def authenticate(self, request, username=None, password=None, **kwargs):
"""认证用户"""
if username is None or password is None:
return None
try:
# 自定义查询逻辑
user = self.get_user_by_username(username)
if user and self.check_password(password, user.password):
return user
except Exception:
# 记录认证失败日志
import logging
logger = logging.getLogger('auth.failed')
logger.warning(f"Authentication failed for username: {username}")
return None
def get_user_by_username(self, username):
"""根据用户名获取用户"""
try:
from django.contrib.auth.models import User
return User.objects.get(username=username)
except User.DoesNotExist:
return None
def check_password(self, raw_password, hashed_password):
"""检查密码"""
from django.contrib.auth.hashers import check_password
return check_password(raw_password, hashed_password)
def get_user(self, user_id):
"""获取用户"""
try:
from django.contrib.auth.models import User
return User.objects.get(pk=user_id)
except User.DoesNotExist:
return None
# 认证处理器
class AuthHandler:
"""认证处理器"""
def __init__(self):
self.backends = self.get_backends()
def get_backends(self):
"""获取认证后端"""
from django.conf import settings
from django.utils.module_loading import import_string
backends = []
for backend_path in settings.AUTHENTICATION_BACKENDS:
backend_cls = import_string(backend_path)
backends.append(backend_cls())
return backends
def authenticate_user(self, **credentials):
"""认证用户"""
for backend in self.backends:
user = backend.authenticate(None, **credentials)
if user:
# 设置后端信息
user.backend = f"{backend.__class__.__module__}.{backend.__class__.__name__}"
return user
return None
# 使用示例
def custom_authenticate(username, password):
"""自定义认证函数"""
handler = AuthHandler()
return handler.authenticate_user(username=username, password=password)#用户会话管理
# 用户会话管理
from django.contrib.auth import login, logout
from django.contrib.sessions.models import Session
from django.contrib.auth.models import User
from django.utils import timezone
class UserSessionManager:
"""用户会话管理器"""
@staticmethod
def create_user_session(request, user, remember_me=False):
"""创建用户会话"""
# 登录用户
login(request, user)
# 设置会话过期时间
if not remember_me:
request.session.set_expiry(0) # 浏览器关闭时过期
else:
request.session.set_expiry(1209600) # 2周后过期
# 记录登录信息
UserSessionManager.record_login_info(user, request)
@staticmethod
def record_login_info(user, request):
"""记录登录信息"""
# 更新用户最后登录时间
user.last_login = timezone.now()
user.save(update_fields=['last_login'])
# 记录登录日志
login_log = UserLoginLog.objects.create(
user=user,
ip_address=request.META.get('REMOTE_ADDR'),
user_agent=request.META.get('HTTP_USER_AGENT', ''),
login_time=timezone.now()
)
# 清除之前的失败登录记录
UserFailedLoginAttempt.objects.filter(
user=user,
attempt_time__lt=timezone.now() - timezone.timedelta(hours=1)
).delete()
@staticmethod
def logout_user(request):
"""登出用户"""
if request.user.is_authenticated:
# 记录登出信息
UserSessionManager.record_logout_info(request.user, request)
# 执行登出
logout(request)
@staticmethod
def record_logout_info(user, request):
"""记录登出信息"""
UserLogoutLog.objects.create(
user=user,
ip_address=request.META.get('REMOTE_ADDR'),
logout_time=timezone.now()
)
# 登录日志模型
from django.db import models
class UserLoginLog(models.Model):
"""用户登录日志"""
user = models.ForeignKey(User, on_delete=models.CASCADE)
ip_address = models.GenericIPAddressField()
user_agent = models.TextField(blank=True)
login_time = models.DateTimeField(auto_now_add=True)
success = models.BooleanField(default=True)
class Meta:
db_table = 'user_login_log'
ordering = ['-login_time']
class UserLogoutLog(models.Model):
"""用户登出日志"""
user = models.ForeignKey(User, on_delete=models.CASCADE)
ip_address = models.GenericIPAddressField()
logout_time = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'user_logout_log'
ordering = ['-logout_time']
class UserFailedLoginAttempt(models.Model):
"""用户失败登录尝试"""
username = models.CharField(max_length=150)
ip_address = models.GenericIPAddressField()
attempt_time = models.DateTimeField(auto_now_add=True)
user_agent = models.TextField(blank=True)
class Meta:
db_table = 'user_failed_login_attempts'
ordering = ['-attempt_time']
# 会话安全检查
class SessionSecurityChecker:
"""会话安全检查器"""
@staticmethod
def check_brute_force_attack(username, ip_address, max_attempts=5):
"""检查暴力破解攻击"""
from django.utils import timezone
from django.conf import settings
# 检查相同IP的失败尝试
recent_attempts = UserFailedLoginAttempt.objects.filter(
ip_address=ip_address,
attempt_time__gte=timezone.now() - timezone.timedelta(
seconds=getattr(settings, 'BRUTE_FORCE_TIME_WINDOW', 900) # 15分钟
)
).count()
if recent_attempts >= getattr(settings, 'MAX_LOGIN_ATTEMPTS_PER_IP', 10):
return True, "IP地址被临时封禁"
# 检查特定用户的失败尝试
user_attempts = UserFailedLoginAttempt.objects.filter(
username=username,
attempt_time__gte=timezone.now() - timezone.timedelta(
seconds=getattr(settings, 'BRUTE_FORCE_TIME_WINDOW', 900)
)
).count()
if user_attempts >= max_attempts:
return True, "账户被临时锁定"
return False, ""
@staticmethod
def log_failed_attempt(username, ip_address, user_agent):
"""记录失败登录尝试"""
UserFailedLoginAttempt.objects.create(
username=username,
ip_address=ip_address,
user_agent=user_agent
)
# 使用示例
def secure_login_view(request):
"""安全登录视图"""
if request.method == 'POST':
username = request.POST.get('username')
password = request.POST.get('password')
remember_me = request.POST.get('remember_me', False)
# 检查暴力破解
ip_address = request.META.get('REMOTE_ADDR')
user_agent = request.META.get('HTTP_USER_AGENT', '')
is_attack, attack_msg = SessionSecurityChecker.check_brute_force_attack(
username, ip_address
)
if is_attack:
return {'error': attack_msg, 'blocked': True}
# 尝试认证
user = authenticate(request, username=username, password=password)
if user:
if user.is_active:
# 创建安全会话
UserSessionManager.create_user_session(
request, user, remember_me
)
return {'success': True, 'redirect': '/dashboard/'}
else:
return {'error': '账户已被禁用'}
else:
# 记录失败尝试
SessionSecurityChecker.log_failed_attempt(
username, ip_address, user_agent
)
return {'error': '用户名或密码错误'}
return {'success': False}#认证装饰器和混入
# 认证装饰器
from functools import wraps
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin
from django.http import HttpResponseForbidden, HttpResponseRedirect
from django.urls import reverse
from django.contrib import messages
def custom_login_required(function=None, redirect_field_name='next', login_url=None):
"""自定义登录要求装饰器"""
def decorator(view_func):
@wraps(view_func)
def _wrapped_view(request, *args, **kwargs):
if not request.user.is_authenticated:
# 检查是否为AJAX请求
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return HttpResponseForbidden("Authentication required")
# 重定向到登录页面
login_url = login_url or reverse('login')
next_url = request.get_full_path()
redirect_url = f"{login_url}?{redirect_field_name}={next_url}"
return HttpResponseRedirect(redirect_url)
# 检查用户账户状态
if hasattr(request.user, 'is_account_active') and not request.user.is_account_active():
messages.error(request, '您的账户已被禁用')
logout(request)
return HttpResponseRedirect(reverse('login'))
return view_func(request, *args, **kwargs)
return _wrapped_view
if function:
return decorator(function)
return decorator
def role_required(roles, login_url=None, redirect_field_name='next'):
"""角色要求装饰器"""
def decorator(view_func):
@wraps(view_func)
def _wrapped_view(request, *args, **kwargs):
if not request.user.is_authenticated:
login_url = login_url or reverse('login')
next_url = request.get_full_path()
redirect_url = f"{login_url}?{redirect_field_name}={next_url}"
return HttpResponseRedirect(redirect_url)
# 检查用户角色
user_roles = set(request.user.groups.values_list('name', flat=True))
required_roles = set(roles) if isinstance(roles, (list, tuple)) else {roles}
if not (user_roles & required_roles):
messages.error(request, '您没有权限访问此页面')
return HttpResponseForbidden("Access denied")
return view_func(request, *args, **kwargs)
return _wrapped_view
return decorator
def multi_permission_required(permissions, logical='AND', login_url=None, redirect_field_name='next'):
"""多权限要求装饰器"""
def decorator(view_func):
@wraps(view_func)
def _wrapped_view(request, *args, **kwargs):
if not request.user.is_authenticated:
login_url = login_url or reverse('login')
next_url = request.get_full_path()
redirect_url = f"{login_url}?{redirect_field_name}={next_url}"
return HttpResponseRedirect(redirect_url)
# 检查权限
has_permissions = []
for perm in permissions:
if '.' in perm:
app_label, codename = perm.split('.', 1)
has_permissions.append(request.user.has_perm(perm))
else:
# 直接检查权限代码
has_permissions.append(request.user.has_perm(perm))
# 根据逻辑操作符检查权限
if logical.upper() == 'AND':
has_access = all(has_permissions)
elif logical.upper() == 'OR':
has_access = any(has_permissions)
else:
has_access = all(has_permissions) # 默认AND
if not has_access:
messages.error(request, '您没有足够的权限访问此页面')
return HttpResponseForbidden("Access denied")
return view_func(request, *args, **kwargs)
return _wrapped_view
return decorator
# 自定义混入类
from django.contrib.auth.mixins import AccessMixin
class RoleRequiredMixin(AccessMixin):
"""角色要求混入类"""
role_required = None
raise_exception = False
permission_denied_message = '您没有权限访问此页面'
def dispatch(self, request, *args, **kwargs):
if not request.user.is_authenticated:
return self.handle_no_permission()
if not self.has_role():
return self.handle_no_permission()
return super().dispatch(request, *args, **kwargs)
def has_role(self):
"""检查用户角色"""
if self.role_required is None:
return True
user_roles = set(self.request.user.groups.values_list('name', flat=True))
required_roles = set(self.role_required) if isinstance(self.role_required, (list, tuple)) else {self.role_required}
return bool(user_roles & required_roles)
def handle_no_permission(self):
"""处理无权限情况"""
if self.raise_exception:
from django.core.exceptions import PermissionDenied
raise PermissionDenied(self.permission_denied_message)
messages.error(self.request, self.permission_denied_message)
return HttpResponseForbidden(self.permission_denied_message)
class MultiPermissionRequiredMixin(AccessMixin):
"""多权限要求混入类"""
permissions_required = None
permissions_logical = 'AND' # 'AND' or 'OR'
raise_exception = False
permission_denied_message = '您没有足够的权限访问此页面'
def dispatch(self, request, *args, **kwargs):
if not request.user.is_authenticated:
return self.handle_no_permission()
if not self.has_permissions():
return self.handle_no_permission()
return super().dispatch(request, *args, **kwargs)
def has_permissions(self):
"""检查用户权限"""
if self.permissions_required is None:
return True
has_perms = []
for perm in self.permissions_required:
if '.' in perm:
has_perms.append(self.request.user.has_perm(perm))
else:
has_perms.append(self.request.user.has_perm(perm))
if self.permissions_logical.upper() == 'AND':
return all(has_perms)
elif self.permissions_logical.upper() == 'OR':
return any(has_perms)
else:
return all(has_perms)
def handle_no_permission(self):
"""处理无权限情况"""
if self.raise_exception:
from django.core.exceptions import PermissionDenied
raise PermissionDenied(self.permission_denied_message)
messages.error(self.request, self.permission_denied_message)
return HttpResponseForbidden(self.permission_denied_message)
# 使用示例
@custom_login_required
@role_required(['admin', 'manager'])
@multi_permission_required(['app.view_report', 'app.edit_data'], logical='OR')
def admin_dashboard_view(request):
"""管理员仪表板视图"""
return {'user': request.user, 'permissions': request.user.get_all_permissions()}
class AdminDashboardView(LoginRequiredMixin, RoleRequiredMixin, MultiPermissionRequiredMixin):
"""管理员仪表板类视图"""
role_required = ['admin', 'manager']
permissions_required = ['app.view_report', 'app.edit_data']
permissions_logical = 'OR'
def get(self, request):
return {'user': request.user, 'permissions': request.user.get_all_permissions()}
}
## 自定义用户模型 \{#自定义用户模型}
### 扩展内置用户模型
```python
# 扩展内置用户模型
from django.contrib.auth.models import AbstractUser
from django.db import models
from django.utils import timezone
class CustomUser(AbstractUser):
"""自定义用户模型"""
# 基础扩展字段
phone_number = models.CharField(max_length=20, blank=True, null=True, unique=True)
birth_date = models.DateField(blank=True, null=True)
avatar = models.ImageField(upload_to='avatars/', blank=True, null=True)
bio = models.TextField(max_length=500, blank=True)
location = models.CharField(max_length=30, blank=True)
website = models.URLField(blank=True)
# 账户状态字段
is_verified = models.BooleanField(default=False)
is_premium = models.BooleanField(default=False)
expiry_date = models.DateTimeField(blank=True, null=True)
# 审计字段
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
# 自定义字段
preferred_language = models.CharField(
max_length=10,
choices=[
('en', 'English'),
('zh', '中文'),
('ja', '日本語'),
('ko', '한국어')
],
default='en'
)
timezone = models.CharField(max_length=50, default='UTC')
# 索引优化
class Meta:
db_table = 'custom_user'
indexes = [
models.Index(fields=['phone_number']),
models.Index(fields=['is_verified']),
models.Index(fields=['is_premium']),
models.Index(fields=['created_at']),
]
def __str__(self):
return f"{self.username} ({self.email})"
def get_full_name(self):
"""获取完整姓名"""
full_name = f"{self.first_name} {self.last_name}".strip()
if not full_name:
full_name = self.username
return full_name
def get_short_name(self):
"""获取简称"""
return self.first_name or self.username
def is_account_active(self):
"""检查账户是否活跃"""
if not self.is_active:
return False
if self.expiry_date and self.expiry_date < timezone.now():
return False
return True
def get_display_name(self):
"""获取显示名称"""
if self.first_name and self.last_name:
return f"{self.first_name} {self.last_name}"
elif self.first_name:
return self.first_name
elif self.last_name:
return self.last_name
else:
return self.username
# 用户配置文件模型
class UserProfile(models.Model):
"""用户配置文件"""
user = models.OneToOneField(CustomUser, on_delete=models.CASCADE, related_name='profile')
# 个人资料
gender = models.CharField(
max_length=10,
choices=[
('male', '男'),
('female', '女'),
('other', '其他'),
('prefer_not_to_say', '不愿透露')
],
blank=True
)
occupation = models.CharField(max_length=100, blank=True)
company = models.CharField(max_length=100, blank=True)
education = models.CharField(max_length=100, blank=True)
# 社交信息
wechat = models.CharField(max_length=50, blank=True)
qq = models.CharField(max_length=20, blank=True)
weibo = models.URLField(blank=True)
# 通知设置
email_notifications = models.BooleanField(default=True)
sms_notifications = models.BooleanField(default=False)
push_notifications = models.BooleanField(default=True)
# 隐私设置
profile_public = models.BooleanField(default=True)
show_email = models.BooleanField(default=True)
show_phone = models.BooleanField(default=False)
# 统计信息
login_count = models.PositiveIntegerField(default=0)
post_count = models.PositiveIntegerField(default=0)
follower_count = models.PositiveIntegerField(default=0)
following_count = models.PositiveIntegerField(default=0)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'user_profile'
verbose_name = '用户配置文件'
verbose_name_plural = '用户配置文件'
# 信号处理器 - 自动创建用户配置文件
from django.db.models.signals import post_save
from django.dispatch import receiver
@receiver(post_save, sender=CustomUser)
def create_user_profile(sender, instance, created, **kwargs):
"""创建用户配置文件"""
if created:
UserProfile.objects.create(user=instance)
@receiver(post_save, sender=CustomUser)
def save_user_profile(sender, instance, **kwargs):
"""保存用户配置文件"""
if hasattr(instance, 'profile'):
instance.profile.save()
# 用户验证器
from django.core.exceptions import ValidationError
from django.contrib.auth.validators import UnicodeUsernameValidator
import re
class UserValidator:
"""用户验证器"""
@staticmethod
def validate_phone_number(phone_number):
"""验证手机号"""
if phone_number:
# 中国手机号验证
pattern = r'^1[3-9]\d{9}$'
if not re.match(pattern, phone_number):
raise ValidationError('请输入有效的手机号码')
@staticmethod
def validate_username(username):
"""验证用户名"""
if username:
# 检查长度
if len(username) < 3 or len(username) > 30:
raise ValidationError('用户名长度应在3-30个字符之间')
# 检查字符
if not re.match(r'^[a-zA-Z0-9_]+$', username):
raise ValidationError('用户名只能包含字母、数字和下划线')
@staticmethod
def validate_email(email):
"""验证邮箱"""
if email:
from django.core.validators import validate_email
try:
validate_email(email)
except ValidationError:
raise ValidationError('请输入有效的邮箱地址')
# 自定义用户管理器
from django.contrib.auth.models import BaseUserManager
class CustomUserManager(BaseUserManager):
"""自定义用户管理器"""
def create_user(self, username, email=None, password=None, **extra_fields):
"""创建普通用户"""
if not username:
raise ValueError('用户名不能为空')
email = self.normalize_email(email) if email else None
user = self.model(username=username, email=email, **extra_fields)
user.set_password(password)
user.save(using=self._db)
return user
def create_superuser(self, username, email=None, password=None, **extra_fields):
"""创建超级用户"""
extra_fields.setdefault('is_staff', True)
extra_fields.setdefault('is_superuser', True)
extra_fields.setdefault('is_verified', True)
if extra_fields.get('is_staff') is not True:
raise ValueError('超级用户必须设置is_staff=True')
if extra_fields.get('is_superuser') is not True:
raise ValueError('超级用户必须设置is_superuser=True')
return self.create_user(username, email, password, **extra_fields)
def get_by_phone(self, phone_number):
"""根据手机号获取用户"""
return self.get(phone_number=phone_number)
def get_active_users(self):
"""获取活跃用户"""
return self.filter(
is_active=True,
expiry_date__isnull=True
).select_related('profile')
def get_premium_users(self):
"""获取付费用户"""
return self.filter(is_premium=True).select_related('profile')
# 更新模型使用自定义管理器
CustomUser.add_to_class('objects', CustomUserManager())
# 用户服务类
class UserService:
"""用户服务类"""
@staticmethod
def create_user_with_profile(username, email, password, **extra_fields):
"""创建用户并配置资料"""
user = CustomUser.objects.create_user(
username=username,
email=email,
password=password,
**extra_fields
)
# 更新配置文件
profile_data = extra_fields.get('profile_data', {})
if profile_data:
profile = user.profile
for field, value in profile_data.items():
if hasattr(profile, field):
setattr(profile, field, value)
profile.save()
return user
@staticmethod
def update_user_profile(user, **profile_data):
"""更新用户配置文件"""
profile = user.profile
for field, value in profile_data.items():
if hasattr(profile, field):
setattr(profile, field, value)
profile.save()
return profile
@staticmethod
def get_user_recommendations(user, limit=10):
"""获取用户推荐"""
# 基于兴趣、地理位置等的推荐逻辑
from django.db.models import Q
recommendations = CustomUser.objects.filter(
Q(profile__location=user.profile.location) &
~Q(id=user.id) # 排除自己
).exclude(
id__in=user.profile.following.all() # 排除已关注的
).select_related('profile')[:limit]
return recommendations
# 使用示例
def user_creation_example():
"""用户创建示例"""
# 创建用户
user = UserService.create_user_with_profile(
username='john_doe',
email='john@example.com',
password='secure_password',
first_name='John',
last_name='Doe',
phone_number='13812345678',
profile_data={
'gender': 'male',
'occupation': 'Software Engineer',
'company': 'Tech Corp'
}
)
# 更新配置文件
UserService.update_user_profile(
user,
email_notifications=False,
show_email=False
)
return user#完全自定义用户模型
# 完全自定义用户模型(替代AbstractUser)
from django.contrib.auth.models import AbstractBaseUser, PermissionsMixin
from django.contrib.auth.base_user import BaseUserManager
from django.db import models
class EmailUserManager(BaseUserManager):
"""邮箱用户管理器"""
def create_user(self, email, password=None, **extra_fields):
"""创建用户"""
if not email:
raise ValueError('邮箱地址不能为空')
email = self.normalize_email(email)
user = self.model(email=email, **extra_fields)
user.set_password(password)
user.save(using=self._db)
return user
def create_superuser(self, email, password=None, **extra_fields):
"""创建超级用户"""
extra_fields.setdefault('is_staff', True)
extra_fields.setdefault('is_superuser', True)
extra_fields.setdefault('is_verified', True)
if extra_fields.get('is_staff') is not True:
raise ValueError('超级用户必须设置is_staff=True')
if extra_fields.get('is_superuser') is not True:
raise ValueError('超级用户必须设置is_superuser=True')
return self.create_user(email, password, **extra_fields)
class EmailUser(AbstractBaseUser, PermissionsMixin):
"""邮箱用户模型"""
email = models.EmailField(unique=True)
username = models.CharField(max_length=150, blank=True)
first_name = models.CharField(max_length=30, blank=True)
last_name = models.CharField(max_length=150, blank=True)
is_staff = models.BooleanField(default=False)
is_active = models.BooleanField(default=True)
is_verified = models.BooleanField(default=False)
date_joined = models.DateTimeField(auto_now_add=True)
# 自定义字段
phone_number = models.CharField(max_length=20, blank=True)
avatar = models.URLField(blank=True)
bio = models.TextField(max_length=500, blank=True)
USERNAME_FIELD = 'email' # 使用邮箱作为登录字段
REQUIRED_FIELDS = ['username'] # 创建超级用户时需要的字段
objects = EmailUserManager()
class Meta:
db_table = 'email_user'
verbose_name = '邮箱用户'
verbose_name_plural = '邮箱用户'
def __str__(self):
return self.email
def get_full_name(self):
"""获取完整姓名"""
full_name = f"{self.first_name} {self.last_name}".strip()
if not full_name:
full_name = self.username or self.email
return full_name
def get_short_name(self):
"""获取简称"""
return self.first_name or self.email.split('@')[0]
# 使用完全自定义用户模型的配置
"""
# settings.py 配置
AUTH_USER_MODEL = 'your_app.EmailUser'
# 注意:使用自定义用户模型需要在项目初期就设置,
# 一旦有数据后很难更改
"""#认证后端配置
#多因素认证后端
# 多因素认证后端
import pyotp
import qrcode
from io import BytesIO
import base64
from django.contrib.auth.backends import BaseBackend
from django.contrib.auth import get_user_model
User = get_user_model()
class TwoFactorAuthBackend(BaseBackend):
"""两因素认证后端"""
def authenticate(self, request, username=None, password=None, otp_code=None, **kwargs):
"""两因素认证"""
if username is None or password is None:
return None
# 首先验证用户名密码
user = self.authenticate_basic(username, password)
if not user:
return None
# 检查是否启用两因素认证
if hasattr(user, 'profile') and user.profile.enable_2fa:
if not otp_code:
# 需要OTP码但未提供
raise ValueError('需要提供两因素认证码')
# 验证OTP码
if not self.verify_otp(user, otp_code):
return None
return user
def authenticate_basic(self, username, password):
"""基础认证"""
try:
user = User.objects.get(username=username)
if user.check_password(password):
return user
except User.DoesNotExist:
pass
return None
def verify_otp(self, user, otp_code):
"""验证OTP码"""
if not hasattr(user, 'profile') or not user.profile.totp_secret:
return False
totp = pyotp.TOTP(user.profile.totp_secret)
return totp.verify(otp_code, valid_window=1) # 允许前后1个时间窗口
def get_user(self, user_id):
"""获取用户"""
try:
return User.objects.get(pk=user_id)
except User.DoesNotExist:
return None
# TOTP服务类
class TOTPService:
"""TOTP服务"""
@staticmethod
def generate_secret():
"""生成TOTP密钥"""
return pyotp.random_base32()
@staticmethod
def generate_qr_code(user, issuer_name="YourApp"):
"""生成QR码"""
secret = user.profile.totp_secret
totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
name=user.email,
issuer_name=issuer_name
)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format='PNG')
img_str = base64.b64encode(buffer.getvalue()).decode()
return img_str, totp_uri
@staticmethod
def verify_code(secret, code):
"""验证TOTP码"""
totp = pyotp.TOTP(secret)
return totp.verify(code, valid_window=1)
# 更新用户配置文件模型以支持2FA
UserProfile.add_to_class(
'enable_2fa',
models.BooleanField(default=False)
)
UserProfile.add_to_class(
'totp_secret',
models.CharField(max_length=32, blank=True)
)
UserProfile.add_to_class(
'backup_codes',
models.TextField(blank=True) # JSON格式存储备用码
)
# 社交认证后端
import requests
from social_core.backends.oauth import BaseOAuth2
class WeChatOAuthBackend(BaseBackend):
"""微信OAuth认证后端"""
def authenticate(self, request, code=None, **kwargs):
"""微信OAuth认证"""
if not code:
return None
# 获取访问令牌
token_url = 'https://api.weixin.qq.com/sns/oauth2/access_token'
params = {
'appid': kwargs.get('appid'),
'secret': kwargs.get('secret'),
'code': code,
'grant_type': 'authorization_code'
}
response = requests.get(token_url, params=params)
token_data = response.json()
if 'errcode' in token_data:
return None
# 获取用户信息
user_info_url = 'https://api.weixin.qq.com/sns/userinfo'
user_params = {
'access_token': token_data['access_token'],
'openid': token_data['openid'],
'lang': 'zh_CN'
}
user_response = requests.get(user_info_url, params=user_params)
user_data = user_response.json()
if 'errcode' in user_data:
return None
# 创建或获取用户
user, created = User.objects.get_or_create(
username=f"wechat_{user_data['openid']}",
defaults={
'first_name': user_data.get('nickname', ''),
'avatar': user_data.get('headimgurl', ''),
'is_verified': True
}
)
return user
def get_user(self, user_id):
"""获取用户"""
try:
return User.objects.get(pk=user_id)
except User.DoesNotExist:
return None
# LDAP认证后端(需要python-ldap)
"""
class LDAPAuthBackend(BaseBackend):
# LDAP认证后端实现
pass
"""
# 自定义认证后端配置
"""
# settings.py 配置
AUTHENTICATION_BACKENDS = [
'django.contrib.auth.backends.ModelBackend', # 默认后端
'your_app.backends.TwoFactorAuthBackend', # 两因素认证
'your_app.backends.WeChatOAuthBackend', # 微信OAuth
# 'your_app.backends.LDAPAuthBackend', # LDAP认证
]
"""#密码策略和安全
# 密码策略和安全
from django.contrib.auth.hashers import make_password, check_password
from django.core.exceptions import ValidationError
import re
from django.conf import settings
class PasswordPolicy:
"""密码策略"""
def __init__(self):
self.min_length = getattr(settings, 'PASSWORD_MIN_LENGTH', 8)
self.require_uppercase = getattr(settings, 'PASSWORD_REQUIRE_UPPERCASE', True)
self.require_lowercase = getattr(settings, 'PASSWORD_REQUIRE_LOWERCASE', True)
self.require_numbers = getattr(settings, 'PASSWORD_REQUIRE_NUMBERS', True)
self.require_special = getattr(settings, 'PASSWORD_REQUIRE_SPECIAL', False)
self.special_chars = getattr(settings, 'PASSWORD_SPECIAL_CHARS', '!@#$%^&*()_+-=[]{}|;:,.<>?')
def validate_password(self, password, user=None):
"""验证密码强度"""
errors = []
# 长度检查
if len(password) < self.min_length:
errors.append(f'密码长度至少需要{self.min_length}个字符')
# 大写字母检查
if self.require_uppercase and not re.search(r'[A-Z]', password):
errors.append('密码必须包含大写字母')
# 小写字母检查
if self.require_lowercase and not re.search(r'[a-z]', password):
errors.append('密码必须包含小写字母')
# 数字检查
if self.require_numbers and not re.search(r'\d', password):
errors.append('密码必须包含数字')
# 特殊字符检查
if self.require_special and not re.search(f'[{re.escape(self.special_chars)}]', password):
errors.append(f'密码必须包含特殊字符: {self.special_chars}')
# 检查是否与用户信息相似
if user:
similar_checks = [
user.username.lower() in password.lower(),
user.first_name.lower() in password.lower(),
user.last_name.lower() in password.lower(),
user.email.lower() in password.lower() if user.email else False
]
if any(similar_checks):
errors.append('密码不能与您的个人信息相似')
if errors:
raise ValidationError(errors)
return password
# 密码历史记录
class PasswordHistory(models.Model):
"""密码历史记录"""
user = models.ForeignKey(User, on_delete=models.CASCADE)
password_hash = models.CharField(max_length=128) # 存储哈希后的密码
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'password_history'
ordering = ['-created_at']
@classmethod
def add_password(cls, user, raw_password):
"""添加密码到历史记录"""
password_hash = make_password(raw_password)
cls.objects.create(user=user, password_hash=password_hash)
# 限制历史记录数量
cls.trim_history(user)
@classmethod
def trim_history(cls, user, max_history=5):
"""限制密码历史记录数量"""
history_count = cls.objects.filter(user=user).count()
if history_count > max_history:
oldest_records = cls.objects.filter(user=user).order_by('created_at')[max_history:]
oldest_records.delete()
@classmethod
def is_password_used_before(cls, user, raw_password, check_count=5):
"""检查密码是否曾经使用过"""
recent_history = cls.objects.filter(user=user).order_by('-created_at')[:check_count]
for record in recent_history:
if check_password(raw_password, record.password_hash):
return True
return False
# 密码重置令牌
from django.utils.crypto import get_random_string
from django.utils import timezone
class PasswordResetToken(models.Model):
"""密码重置令牌"""
user = models.ForeignKey(User, on_delete=models.CASCADE)
token = models.CharField(max_length=32, unique=True)
created_at = models.DateTimeField(auto_now_add=True)
is_used = models.BooleanField(default=False)
class Meta:
db_table = 'password_reset_token'
@classmethod
def generate_token(cls, user):
"""生成重置令牌"""
# 先删除旧令牌
cls.objects.filter(user=user, is_used=False).delete()
token = get_random_string(32)
reset_token = cls.objects.create(user=user, token=token)
return reset_token
def is_valid(self):
"""检查令牌是否有效"""
from django.conf import settings
validity_period = getattr(settings, 'PASSWORD_RESET_TOKEN_VALIDITY', 3600) # 1小时
return (
not self.is_used and
(timezone.now() - self.created_at).seconds < validity_period
)
# 密码服务
class PasswordService:
"""密码服务"""
def __init__(self):
self.policy = PasswordPolicy()
def set_password(self, user, raw_password, check_history=True):
"""设置用户密码"""
# 验证密码策略
self.policy.validate_password(raw_password, user)
# 检查密码历史
if check_history and PasswordHistory.is_password_used_before(user, raw_password):
raise ValidationError('新密码不能与最近使用的密码相同')
# 设置密码
user.set_password(raw_password)
user.save()
# 添加到历史记录
PasswordHistory.add_password(user, raw_password)
def change_password(self, user, old_password, new_password):
"""更改密码"""
# 验证旧密码
if not user.check_password(old_password):
raise ValidationError('旧密码不正确')
# 设置新密码
self.set_password(user, new_password)
def reset_password_with_token(self, token_str, new_password):
"""使用令牌重置密码"""
try:
reset_token = PasswordResetToken.objects.get(token=token_str)
except PasswordResetToken.DoesNotExist:
raise ValidationError('无效的重置令牌')
if not reset_token.is_valid():
raise ValidationError('重置令牌已过期或已被使用')
# 设置新密码
self.set_password(reset_token.user, new_password)
# 标记令牌为已使用
reset_token.is_used = True
reset_token.save()
def generate_password_reset_link(self, user):
"""生成密码重置链接"""
token = PasswordResetToken.generate_token(user)
# 这里应该生成实际的重置链接
# 通常会发送邮件或短信
reset_link = f"/reset-password/{token.token}/"
return reset_link
# 密码强度指示器
class PasswordStrengthIndicator:
"""密码强度指示器"""
@staticmethod
def calculate_strength(password):
"""计算密码强度"""
score = 0
feedback = []
# 长度分数
if len(password) >= 8:
score += 1
else:
feedback.append("密码至少需要8个字符")
if len(password) >= 12:
score += 1
# 字符多样性
if re.search(r'[a-z]', password):
score += 1
else:
feedback.append("添加小写字母")
if re.search(r'[A-Z]', password):
score += 1
else:
feedback.append("添加大写字母")
if re.search(r'\d', password):
score += 1
else:
feedback.append("添加数字")
if re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
score += 1
else:
feedback.append("添加特殊字符")
# 重复字符检查
if len(set(password)) / len(password) < 0.7: # 70%唯一字符
score -= 1
feedback.append("避免重复字符")
# 强度评级
if score <= 2:
strength = 'weak'
level = '弱'
elif score <= 4:
strength = 'medium'
level = '中'
else:
strength = 'strong'
level = '强'
return {
'score': score,
'strength': strength,
'level': level,
'feedback': feedback,
'percentage': min(100, int((score / 6) * 100))
}
# 使用示例
def password_management_example():
"""密码管理示例"""
service = PasswordService()
indicator = PasswordStrengthIndicator()
# 检查密码强度
password_strength = indicator.calculate_strength("MyPass123!")
print(f"密码强度: {password_strength}")
# 创建用户并设置密码
user = CustomUser.objects.create_user(
username='testuser',
email='test@example.com'
)
try:
# 设置符合策略的密码
service.set_password(user, "SecurePass123!")
print("密码设置成功")
except ValidationError as e:
print(f"密码验证失败: {e}")
# 生成密码重置链接
reset_link = service.generate_password_reset_link(user)
print(f"重置链接: {reset_link}")
return user#用户管理与操作
#用户CRUD操作
# 用户CRUD操作
from django.contrib.auth import get_user_model
from django.db import transaction
from django.core.exceptions import ValidationError
User = get_user_model()
class UserManager:
"""用户管理器"""
@staticmethod
def create_user_with_validation(username, email, password, **extra_fields):
"""创建用户(带验证)"""
# 验证输入
validator = UserValidator()
if username:
validator.validate_username(username)
if email:
validator.validate_email(email)
# 检查唯一性
if User.objects.filter(username=username).exists():
raise ValidationError('用户名已存在')
if email and User.objects.filter(email=email).exists():
raise ValidationError('邮箱已被注册')
# 创建用户
with transaction.atomic():
user = User.objects.create_user(
username=username,
email=email,
password=password,
**extra_fields
)
return user
@staticmethod
def update_user(user_id, **updates):
"""更新用户信息"""
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
raise ValidationError('用户不存在')
# 验证更新字段
validator = UserValidator()
if 'username' in updates:
validator.validate_username(updates['username'])
# 检查用户名唯一性(排除当前用户)
if User.objects.filter(username=updates['username']).exclude(id=user_id).exists():
raise ValidationError('用户名已被使用')
if 'email' in updates:
validator.validate_email(updates['email'])
# 检查邮箱唯一性
if User.objects.filter(email=updates['email']).exclude(id=user_id).exists():
raise ValidationError('邮箱已被使用')
# 更新字段
for field, value in updates.items():
if hasattr(user, field):
setattr(user, field, value)
user.save()
return user
@staticmethod
def deactivate_user(user_id, reason=''):
"""停用用户"""
try:
user = User.objects.get(id=user_id)
user.is_active = False
user.save()
# 记录停用原因
UserDeactivationLog.objects.create(
user=user,
reason=reason,
deactivated_by=None # 可以记录操作人
)
return True
except User.DoesNotExist:
return False
@staticmethod
def activate_user(user_id):
"""激活用户"""
try:
user = User.objects.get(id=user_id)
user.is_active = True
user.save()
return True
except User.DoesNotExist:
return False
@staticmethod
def delete_user(user_id, hard_delete=False):
"""删除用户"""
try:
user = User.objects.get(id=user_id)
if hard_delete:
# 硬删除
user.delete()
else:
# 软删除 - 重命名并停用
user.username = f"deleted_{user.id}_{user.username}"
user.is_active = False
user.save()
return True
except User.DoesNotExist:
return False
@staticmethod
def bulk_create_users(user_data_list):
"""批量创建用户"""
users = []
errors = []
for i, user_data in enumerate(user_data_list):
try:
user = UserManager.create_user_with_validation(**user_data)
users.append(user)
except ValidationError as e:
errors.append(f"用户 {i+1}: {str(e)}")
return users, errors
# 用户停用日志
class UserDeactivationLog(models.Model):
"""用户停用日志"""
user = models.ForeignKey(User, on_delete=models.CASCADE)
reason = models.TextField()
deactivated_by = models.ForeignKey(
User,
on_delete=models.SET_NULL,
null=True,
related_name='deactivated_users'
)
deactivated_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'user_deactivation_log'
# 用户搜索和筛选
class UserSearchFilter:
"""用户搜索和筛选"""
@staticmethod
def search_users(query, filters=None):
"""搜索用户"""
from django.db.models import Q
queryset = User.objects.all()
# 文本搜索
if query:
queryset = queryset.filter(
Q(username__icontains=query) |
Q(first_name__icontains=query) |
Q(last_name__icontains=query) |
Q(email__icontains=query)
)
# 应用过滤器
if filters:
if 'is_active' in filters:
queryset = queryset.filter(is_active=filters['is_active'])
if 'is_verified' in filters:
if hasattr(User, 'is_verified'):
queryset = queryset.filter(is_verified=filters['is_verified'])
if 'date_joined_after' in filters:
queryset = queryset.filter(date_joined__gte=filters['date_joined_after'])
if 'date_joined_before' in filters:
queryset = queryset.filter(date_joined__lte=filters['date_joined_before'])
if 'groups' in filters:
queryset = queryset.filter(groups__name__in=filters['groups'])
return queryset.distinct()
@staticmethod
def get_user_statistics():
"""获取用户统计信息"""
total_users = User.objects.count()
active_users = User.objects.filter(is_active=True).count()
verified_users = User.objects.filter(is_verified=True).count() if hasattr(User, 'is_verified') else 0
premium_users = User.objects.filter(is_premium=True).count() if hasattr(User, 'is_premium') else 0
# 按日期统计
from django.db.models import Count
from django.utils import timezone
from datetime import timedelta
today = timezone.now().date()
last_week = today - timedelta(days=7)
daily_signups = User.objects.filter(
date_joined__date__range=[last_week, today]
).values('date_joined__date').annotate(count=Count('id')).order_by('date_joined__date')
return {
'total': total_users,
'active': active_users,
'verified': verified_users,
'premium': premium_users,
'active_percentage': (active_users / total_users * 100) if total_users > 0 else 0,
'daily_signups': list(daily_signups)
}
# 用户导入导出
import csv
import json
from django.http import HttpResponse
class UserImportExport:
"""用户导入导出"""
@staticmethod
def export_users_csv(users, fields=None):
"""导出用户为CSV"""
if fields is None:
fields = ['username', 'email', 'first_name', 'last_name', 'is_active', 'date_joined']
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename="users.csv"'
writer = csv.writer(response)
# 写入表头
writer.writerow(fields)
# 写入数据
for user in users:
row = []
for field in fields:
value = getattr(user, field, '')
if isinstance(value, bool):
value = 'Yes' if value else 'No'
elif hasattr(value, 'strftime'): # DateTime
value = value.strftime('%Y-%m-%d %H:%M:%S')
row.append(str(value))
writer.writerow(row)
return response
@staticmethod
def import_users_from_csv(csv_file, required_fields=None):
"""从CSV导入用户"""
if required_fields is None:
required_fields = ['username', 'email']
users_created = []
errors = []
decoded_file = csv_file.read().decode('utf-8').splitlines()
reader = csv.DictReader(decoded_file)
for row_num, row in enumerate(reader, start=2): # 从第2行开始(跳过表头)
try:
# 验证必需字段
for field in required_fields:
if not row.get(field):
raise ValidationError(f"第{row_num}行缺少必需字段: {field}")
# 创建用户
user_data = {
'username': row.get('username'),
'email': row.get('email', ''),
'first_name': row.get('first_name', ''),
'last_name': row.get('last_name', ''),
'is_active': row.get('is_active', 'Yes').lower() in ['yes', 'true', '1'],
}
# 设置默认密码(实际应用中应该发送密码重置邮件)
user_data['password'] = 'TempPassword123!'
user = UserManager.create_user_with_validation(**user_data)
users_created.append(user)
except ValidationError as e:
errors.append(f"第{row_num}行错误: {str(e)}")
except Exception as e:
errors.append(f"第{row_num}行未知错误: {str(e)}")
return users_created, errors
# 使用示例
def user_management_example():
"""用户管理示例"""
# 创建用户
user = UserManager.create_user_with_validation(
username='newuser',
email='newuser@example.com',
password='SecurePass123!',
first_name='New',
last_name='User'
)
# 搜索用户
search_results = UserSearchFilter.search_users('new', {
'is_active': True
})
# 获取统计信息
stats = UserSearchFilter.get_user_statistics()
print(f"用户统计: {stats}")
return user#用户活动追踪
# 用户活动追踪
class UserActivityTracker:
"""用户活动追踪器"""
@staticmethod
def log_user_activity(user, activity_type, details=None, ip_address=None):
"""记录用户活动"""
return UserActivityLog.objects.create(
user=user,
activity_type=activity_type,
details=details or {},
ip_address=ip_address,
timestamp=timezone.now()
)
@staticmethod
def get_user_activity(user, days=30):
"""获取用户活动记录"""
from datetime import timedelta
start_date = timezone.now() - timedelta(days=days)
return UserActivityLog.objects.filter(
user=user,
timestamp__gte=start_date
).order_by('-timestamp')
@staticmethod
def get_user_engagement_score(user):
"""计算用户参与度得分"""
activities = UserActivityLog.objects.filter(
user=user,
timestamp__gte=timezone.now() - timezone.timedelta(days=30)
)
score = 0
activity_types = activities.values_list('activity_type', flat=True)
# 不同活动类型的权重
weights = {
'login': 5,
'view_content': 2,
'create_content': 10,
'comment': 3,
'share': 4,
'purchase': 15
}
for activity_type in activity_types:
score += weights.get(activity_type, 1)
return score
# 用户活动日志模型
class UserActivityLog(models.Model):
"""用户活动日志"""
user = models.ForeignKey(User, on_delete=models.CASCADE)
activity_type = models.CharField(max_length=50)
details = models.JSONField(default=dict) # 存储活动详情
ip_address = models.GenericIPAddressField(null=True, blank=True)
user_agent = models.TextField(blank=True)
timestamp = models.DateTimeField(auto_now_add=True)
session_key = models.CharField(max_length=40, blank=True)
class Meta:
db_table = 'user_activity_log'
ordering = ['-timestamp']
indexes = [
models.Index(fields=['user', 'timestamp']),
models.Index(fields=['activity_type']),
models.Index(fields=['timestamp']),
]
# 用户在线状态追踪
class UserOnlineStatus(models.Model):
"""用户在线状态"""
user = models.OneToOneField(User, on_delete=models.CASCADE)
is_online = models.BooleanField(default=False)
last_seen = models.DateTimeField(auto_now=True)
session_key = models.CharField(max_length=40, blank=True)
class Meta:
db_table = 'user_online_status'
# 在线用户追踪中间件
class OnlineUserMiddleware:
"""在线用户追踪中间件"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
if request.user.is_authenticated:
# 更新用户在线状态
online_status, created = UserOnlineStatus.objects.get_or_create(
user=request.user
)
online_status.is_online = True
online_status.last_seen = timezone.now()
online_status.session_key = request.session.session_key
online_status.save()
response = self.get_response(request)
return response
# 用户行为分析
class UserBehaviorAnalyzer:
"""用户行为分析器"""
@staticmethod
def analyze_user_behavior(user):
"""分析用户行为"""
from datetime import timedelta
thirty_days_ago = timezone.now() - timedelta(days=30)
activities = UserActivityLog.objects.filter(
user=user,
timestamp__gte=thirty_days_ago
)
analysis = {
'total_activities': activities.count(),
'activities_by_type': activities.values('activity_type').annotate(
count=models.Count('activity_type')
),
'most_active_day': activities.extra(
select={'day': 'date(timestamp)'}
).values('day').annotate(
count=models.Count('id')
).order_by('-count').first(),
'average_daily_activities': activities.extra(
select={'day': 'date(timestamp)'}
).values('day').annotate(
count=models.Count('id')
).aggregate(avg=models.Avg('count'))['avg'] or 0,
'last_activity': activities.order_by('-timestamp').first()
}
return analysis
@staticmethod
def get_user_retention_cohort(user):
"""获取用户留存队列分析"""
from datetime import datetime
# 获取用户首次活动日期
first_activity = UserActivityLog.objects.filter(
user=user
).order_by('timestamp').first()
if not first_activity:
return {}
cohort_date = first_activity.timestamp.date()
# 分析后续活动
retention_data = {}
for i in range(7): # 分析7天留存
check_date = cohort_date + timedelta(days=i)
has_activity = UserActivityLog.objects.filter(
user=user,
timestamp__date=check_date
).exists()
retention_data[f'day_{i}'] = has_activity
return retention_data
# 使用示例
def user_tracking_example(request):
"""用户追踪示例"""
if request.user.is_authenticated:
# 记录活动
UserActivityTracker.log_user_activity(
user=request.user,
activity_type='page_view',
details={'page': request.path},
ip_address=request.META.get('REMOTE_ADDR')
)
# 分析行为
behavior = UserBehaviorAnalyzer.analyze_user_behavior(request.user)
engagement_score = UserActivityTracker.get_user_engagement_score(request.user)
return {
'behavior': behavior,
'engagement_score': engagement_score
}
return {}#权限与组管理
#权限系统基础
# 权限系统基础
from django.contrib.auth.models import Permission, Group
from django.contrib.contenttypes.models import ContentType
from django.apps import apps
class PermissionManager:
"""权限管理器"""
@staticmethod
def create_custom_permission(app_label, model_name, codename, name, content_type=None):
"""创建自定义权限"""
if not content_type:
try:
model = apps.get_model(app_label, model_name)
content_type = ContentType.objects.get_for_model(model)
except LookupError:
raise ValueError(f"找不到模型 {app_label}.{model_name}")
permission, created = Permission.objects.get_or_create(
codename=codename,
content_type=content_type,
defaults={
'name': name
}
)
return permission, created
@staticmethod
def assign_permission_to_user(user, permission_codename):
"""为用户分配权限"""
try:
if '.' in permission_codename:
app_label, codename = permission_codename.split('.', 1)
permission = Permission.objects.get(
content_type__app_label=app_label,
codename=codename
)
else:
permission = Permission.objects.get(codename=permission_codename)
user.user_permissions.add(permission)
return True
except Permission.DoesNotExist:
return False
@staticmethod
def remove_permission_from_user(user, permission_codename):
"""从用户移除权限"""
try:
if '.' in permission_codename:
app_label, codename = permission_codename.split('.', 1)
permission = Permission.objects.get(
content_type__app_label=app_label,
codename=codename
)
else:
permission = Permission.objects.get(codename=permission_codename)
user.user_permissions.remove(permission)
return True
except Permission.DoesNotExist:
return False
@staticmethod
def bulk_assign_permissions_to_user(user, permission_list):
"""批量为用户分配权限"""
permissions = []
for perm_codename in permission_list:
try:
if '.' in perm_codename:
app_label, codename = perm_codename.split('.', 1)
permission = Permission.objects.get(
content_type__app_label=app_label,
codename=codename
)
else:
permission = Permission.objects.get(codename=perm_codename)
permissions.append(permission)
except Permission.DoesNotExist:
continue # 跳过不存在的权限
user.user_permissions.set(permissions)
return len(permissions)
# 自定义权限检查器
class CustomPermissionChecker:
"""自定义权限检查器"""
def __init__(self, user):
self.user = user
self.user_permissions = None
self.user_groups = None
def get_user_permissions(self):
"""获取用户权限"""
if self.user_permissions is None:
self.user_permissions = set(
self.user.user_permissions.values_list('codename', flat=True)
)
# 添加组权限
group_permissions = self.user.groups.values_list(
'permissions__codename', flat=True
)
self.user_permissions.update(group_permissions)
return self.user_permissions
def has_permission(self, permission_codename):
"""检查用户权限"""
if self.user.is_superuser:
return True
if not self.user.is_authenticated:
return False
user_perms = self.get_user_permissions()
return permission_codename in user_perms
def has_any_permission(self, permission_list):
"""检查用户是否有任一权限"""
if self.user.is_superuser:
return True
if not self.user.is_authenticated:
return False
user_perms = self.get_user_permissions()
return bool(set(permission_list) & user_perms)
def has_all_permissions(self, permission_list):
"""检查用户是否拥有所有权限"""
if self.user.is_superuser:
return True
if not self.user.is_authenticated:
return False
user_perms = self.get_user_permissions()
return set(permission_list).issubset(user_perms)
def get_user_roles(self):
"""获取用户角色(组)"""
if self.user_groups is None:
self.user_groups = list(
self.user.groups.values_list('name', flat=True)
)
return self.user_groups
def has_role(self, role_name):
"""检查用户角色"""
return role_name in self.get_user_roles()
# 权限装饰器
def permission_required_or_403(perm, login_url=None, raise_exception=False):
"""权限要求装饰器(403拒绝)"""
def check_perms(user):
if not isinstance(perm, (list, tuple)):
perms = (perm,)
else:
perms = perm
permission_checker = CustomPermissionChecker(user)
return permission_checker.has_any_permission(perms)
def decorator(view_func):
@wraps(view_func)
def _wrapped_view(request, *args, **kwargs):
if not check_perms(request.user):
if raise_exception:
from django.core.exceptions import PermissionDenied
raise PermissionDenied
else:
return HttpResponseForbidden("Permission denied")
return view_func(request, *args, **kwargs)
return _wrapped_view
return decorator
# 对象级权限
class ObjectPermissionManager:
"""对象级权限管理器"""
def __init__(self):
self.cache = {}
def user_has_obj_perm(self, user, obj, permission_type):
"""检查用户对特定对象的权限"""
cache_key = f"{user.id}_{obj._meta.label}_{obj.pk}_{permission_type}"
if cache_key in self.cache:
return self.cache[cache_key]
# 检查对象所有权
if hasattr(obj, 'user') and obj.user == user:
result = True
elif hasattr(obj, 'owner') and obj.owner == user:
result = True
else:
# 检查特殊权限(如团队成员、协作者等)
result = self.check_extended_permissions(user, obj, permission_type)
self.cache[cache_key] = result
return result
def check_extended_permissions(self, user, obj, permission_type):
"""检查扩展权限"""
# 这里可以实现团队权限、协作权限等逻辑
# 示例:检查用户是否是对象所在团队的成员
if hasattr(obj, 'team') and hasattr(user, 'teams'):
return user.teams.filter(id=obj.team.id).exists()
return False
def clear_cache(self):
"""清除缓存"""
self.cache.clear()
# 使用示例
def permission_usage_example(request):
"""权限使用示例"""
checker = CustomPermissionChecker(request.user)
# 检查权限
can_edit = checker.has_permission('app.change_model')
can_delete = checker.has_any_permission(['app.delete_model', 'app.super_delete'])
has_all_perms = checker.has_all_permissions(['app.view_model', 'app.change_model'])
# 检查角色
is_admin = checker.has_role('admin')
roles = checker.get_user_roles()
return {
'can_edit': can_edit,
'can_delete': can_delete,
'has_all_perms': has_all_perms,
'is_admin': is_admin,
'roles': roles
}#组管理与角色系统
# 组管理与角色系统
class GroupManager:
"""组管理器"""
@staticmethod
def create_group_with_permissions(name, permissions=None, description=''):
"""创建组并分配权限"""
group, created = Group.objects.get_or_create(name=name)
if permissions:
perm_objects = []
for perm_codename in permissions:
try:
if '.' in perm_codename:
app_label, codename = perm_codename.split('.', 1)
perm = Permission.objects.get(
content_type__app_label=app_label,
codename=codename
)
else:
perm = Permission.objects.get(codename=perm_codename)
perm_objects.append(perm)
except Permission.DoesNotExist:
continue # 跳过不存在的权限
group.permissions.set(perm_objects)
# 如果需要,可以添加描述字段
if description and not created:
# 这里假设你有一个扩展的Group模型或单独的描述表
pass
return group, created
@staticmethod
def assign_user_to_group(user, group_name):
"""将用户分配到组"""
try:
group = Group.objects.get(name=group_name)
user.groups.add(group)
return True
except Group.DoesNotExist:
return False
@staticmethod
def remove_user_from_group(user, group_name):
"""将用户从组移除"""
try:
group = Group.objects.get(name=group_name)
user.groups.remove(group)
return True
except Group.DoesNotExist:
return False
@staticmethod
def bulk_assign_users_to_group(user_ids, group_name):
"""批量将用户分配到组"""
try:
group = Group.objects.get(name=group_name)
users = User.objects.filter(id__in=user_ids)
group.user_set.add(*users)
return True
except Group.DoesNotExist:
return False
@staticmethod
def get_group_members(group_name):
"""获取组成员"""
try:
group = Group.objects.get(name=group_name)
return group.user_set.all()
except Group.DoesNotExist:
return User.objects.none()
# 预定义角色系统
class PredefinedRoles:
"""预定义角色"""
ROLES_PERMISSIONS = {
'admin': [
'auth.add_user', 'auth.change_user', 'auth.delete_user',
'auth.add_group', 'auth.change_group', 'auth.delete_group',
'contenttypes.add_contenttype', 'contenttypes.change_contenttype',
'contenttypes.delete_contenttype', 'admin.add_logentry',
'admin.change_logentry', 'admin.delete_logentry',
# 添加其他管理权限
],
'manager': [
'auth.change_user', 'auth.change_group',
# 管理员特定权限
],
'editor': [
'app.add_article', 'app.change_article', 'app.delete_article',
# 编辑器权限
],
'viewer': [
'app.view_article',
# 查看者权限
],
'moderator': [
'app.change_article', 'app.delete_comment',
# 审核员权限
]
}
@classmethod
def setup_predefined_roles(cls):
"""设置预定义角色"""
for role_name, permissions in cls.ROLES_PERMISSIONS.items():
GroupManager.create_group_with_permissions(
name=role_name,
permissions=permissions
)
@classmethod
def assign_role_to_user(cls, user, role_name):
"""为用户分配角色"""
if role_name in cls.ROLES_PERMISSIONS:
return GroupManager.assign_user_to_group(user, role_name)
return False
@classmethod
def get_available_roles(cls):
"""获取可用角色"""
return list(cls.ROLES_PERMISSIONS.keys())
# 动态角色系统
class DynamicRoleSystem:
"""动态角色系统"""
def __init__(self):
self.role_cache = {}
def create_dynamic_role(self, name, permissions, conditions=None):
"""创建动态角色"""
group, created = Group.objects.get_or_create(name=name)
# 分配权限
perm_objects = []
for perm_codename in permissions:
try:
if '.' in perm_codename:
app_label, codename = perm_codename.split('.', 1)
perm = Permission.objects.get(
content_type__app_label=app_label,
codename=codename
)
else:
perm = Permission.objects.get(codename=perm_codename)
perm_objects.append(perm)
except Permission.DoesNotExist:
continue # 跳过不存在的权限
group.permissions.set(perm_objects)
# 存储条件(如果有的话)
if conditions:
# 这里可以实现条件存储逻辑
# 例如:存储在自定义模型中或使用JSON字段
pass
return group, created
def assign_conditional_role(self, user, role_name, conditions=None):
"""分配条件角色"""
if conditions:
# 检查条件是否满足
if self.check_conditions(user, conditions):
return GroupManager.assign_user_to_group(user, role_name)
return False
else:
return GroupManager.assign_user_to_group(user, role_name)
def check_conditions(self, user, conditions):
"""检查角色分配条件"""
# 实现条件检查逻辑
# 例如:用户属性、时间限制、业务规则等
for condition_type, condition_value in conditions.items():
if condition_type == 'user_field':
# 检查用户字段值
field_name, expected_value = condition_value
if getattr(user, field_name) != expected_value:
return False
elif condition_type == 'time_limit':
# 检查时间限制
from datetime import datetime
start_time, end_time = condition_value
current_time = datetime.now()
if not (start_time <= current_time <= end_time):
return False
return True
# 角色继承系统
class RoleInheritanceSystem:
"""角色继承系统"""
def __init__(self):
self.inheritance_cache = {}
def create_role_hierarchy(self, child_role, parent_roles):
"""创建角色层次结构"""
# 创建子角色
child_group, created = Group.objects.get_or_create(name=child_role)
# 获取父角色权限
all_permissions = set()
for parent_role in parent_roles:
try:
parent_group = Group.objects.get(name=parent_role)
parent_perms = parent_group.permissions.all()
all_permissions.update(parent_perms)
except Group.DoesNotExist:
continue
# 将所有权限分配给子角色
child_group.permissions.set(all_permissions)
# 记录继承关系(可以存储在自定义模型中)
self.record_inheritance(child_role, parent_roles)
return child_group
def record_inheritance(self, child_role, parent_roles):
"""记录继承关系"""
# 这里可以实现继承关系的持久化存储
# 例如:创建一个RoleHierarchy模型
pass
def get_effective_permissions(self, user):
"""获取用户的有效权限(包括继承的)"""
# 获取用户直接拥有的权限
direct_perms = user.user_permissions.all()
# 获取用户所属组的权限
group_perms = Permission.objects.filter(group__user=user)
# 合并所有权限
all_perms = set(direct_perms) | set(group_perms)
return all_perms
# 使用示例
def role_system_example():
"""角色系统示例"""
# 设置预定义角色
PredefinedRoles.setup_predefined_roles()
# 创建用户
user = CustomUser.objects.create_user(
username='testuser',
email='test@example.com',
password='password123'
)
# 分配角色
PredefinedRoles.assign_role_to_user(user, 'viewer')
# 创建动态角色
dynamic_role = DynamicRoleSystem()
editor_role, created = dynamic_role.create_dynamic_role(
name='project_editor',
permissions=['app.add_project', 'app.change_project']
)
# 分配动态角色
GroupManager.assign_user_to_group(user, 'project_editor')
return user
# 权限缓存系统
class PermissionCache:
"""权限缓存系统"""
def __init__(self):
self.cache = {}
self.timeout = 300 # 5分钟缓存
def get_user_permissions(self, user_id):
"""获取用户权限缓存"""
cache_key = f"user_perms_{user_id}"
if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if (time.time() - timestamp) < self.timeout:
return cached_data
# 缓存未命中或已过期,重新获取
from django.contrib.auth.models import User
user = User.objects.get(id=user_id)
permissions = list(user.get_all_permissions())
# 存储到缓存
self.cache[cache_key] = (permissions, time.time())
return permissions
def invalidate_user_cache(self, user_id):
"""使用户权限缓存失效"""
cache_key = f"user_perms_{user_id}"
if cache_key in self.cache:
del self.cache[cache_key]
def clear_all_cache(self):
"""清除所有缓存"""
self.cache.clear()
# 权限审计系统
class PermissionAudit:
"""权限审计系统"""
@staticmethod
def log_permission_change(user, permission, action, changed_by=None):
"""记录权限变更"""
PermissionChangeLog.objects.create(
user=user,
permission=permission,
action=action, # 'add', 'remove', 'modify'
changed_by=changed_by,
timestamp=timezone.now()
)
@staticmethod
def get_permission_audit_trail(user):
"""获取权限审计轨迹"""
return PermissionChangeLog.objects.filter(
user=user
).order_by('-timestamp')
class PermissionChangeLog(models.Model):
"""权限变更日志"""
user = models.ForeignKey(User, on_delete=models.CASCADE)
permission = models.ForeignKey(Permission, on_delete=models.CASCADE)
action = models.CharField(max_length=20, choices=[
('add', '添加'),
('remove', '移除'),
('modify', '修改')
])
changed_by = models.ForeignKey(
User,
on_delete=models.SET_NULL,
null=True,
related_name='performed_changes'
)
timestamp = models.DateTimeField(auto_now_add=True)
details = models.TextField(blank=True)
class Meta:
db_table = 'permission_change_log'
ordering = ['-timestamp']
## 认证安全实践 \{#认证安全实践}
### 密码安全
```python
# 密码安全实践
from django.contrib.auth.hashers import make_password, check_password, identify_hasher
from django.core.validators import validate_password
from django.core.exceptions import ValidationError
class PasswordSecurity:
"""密码安全"""
@staticmethod
def generate_secure_password(length=12, include_symbols=True):
"""生成安全密码"""
import random
import string
chars = string.ascii_letters + string.digits
if include_symbols:
chars += "!@#$%^&*()_+-=[]{}|;:,.<>?"
password = ''.join(random.choice(chars) for _ in range(length))
return password
@staticmethod
def check_password_strength(password):
"""检查密码强度"""
score = 0
feedback = []
# 长度检查
if len(password) >= 8:
score += 1
else:
feedback.append("密码长度至少8位")
if len(password) >= 12:
score += 1
# 字符类型检查
if re.search(r'[a-z]', password):
score += 1
else:
feedback.append("需要包含小写字母")
if re.search(r'[A-Z]', password):
score += 1
else:
feedback.append("需要包含大写字母")
if re.search(r'\d', password):
score += 1
else:
feedback.append("需要包含数字")
if re.search(r'[!@#$%^&*()_+\-=\[\]{}|;:,.<>?]', password):
score += 1
else:
feedback.append("需要包含特殊字符")
# 常见密码检查
common_passwords = [
'password', '123456', 'qwerty', 'abc123',
'password123', 'admin', 'letmein'
]
if password.lower() in common_passwords:
score = max(0, score - 2)
feedback.append("密码过于常见")
# 重复字符检查
if len(set(password)) / len(password) < 0.7: # 70%唯一字符
score = max(0, score - 1)
feedback.append("避免重复字符")
strength_levels = {
0: 'very_weak',
1: 'very_weak',
2: 'weak',
3: 'fair',
4: 'good',
5: 'strong',
6: 'very_strong'
}
return {
'score': score,
'strength': strength_levels.get(score, 'very_weak'),
'feedback': feedback,
'percentage': min(100, int((score / 6) * 100))
}
@staticmethod
def hash_password_with_salt(password, salt=None):
"""使用盐值哈希密码"""
return make_password(password, salt=salt)
@staticmethod
def verify_password_hash(password, hashed_password):
"""验证密码哈希"""
return check_password(password, hashed_password)
@staticmethod
def get_password_hash_algorithm(hashed_password):
"""获取密码哈希算法"""
try:
hasher = identify_hasher(hashed_password)
return hasher.algorithm
except ValueError:
return None
# 密码策略管理器
class PasswordPolicyManager:
"""密码策略管理器"""
def __init__(self):
self.policies = self.load_policies()
def load_policies(self):
"""加载密码策略"""
return {
'min_length': getattr(settings, 'PASSWORD_MIN_LENGTH', 8),
'require_uppercase': getattr(settings, 'PASSWORD_REQUIRE_UPPERCASE', True),
'require_lowercase': getattr(settings, 'PASSWORD_REQUIRE_LOWERCASE', True),
'require_numbers': getattr(settings, 'PASSWORD_REQUIRE_NUMBERS', True),
'require_special': getattr(settings, 'PASSWORD_REQUIRE_SPECIAL', False),
'max_consecutive_chars': getattr(settings, 'PASSWORD_MAX_CONSECUTIVE_CHARS', 3),
'history_check_count': getattr(settings, 'PASSWORD_HISTORY_CHECK_COUNT', 5),
'expiry_days': getattr(settings, 'PASSWORD_EXPIRY_DAYS', 90),
}
def validate_password_compliance(self, password, user=None):
"""验证密码合规性"""
errors = []
policy = self.policies
# 长度检查
if len(password) < policy['min_length']:
errors.append(f"密码长度至少需要{policy['min_length']}个字符")
# 大写字母检查
if policy['require_uppercase'] and not re.search(r'[A-Z]', password):
errors.append("密码必须包含大写字母")
# 小写字母检查
if policy['require_lowercase'] and not re.search(r'[a-z]', password):
errors.append("密码必须包含小写字母")
# 数字检查
if policy['require_numbers'] and not re.search(r'\d', password):
errors.append("密码必须包含数字")
# 特殊字符检查
if policy['require_special']:
special_pattern = r'[!@#$%^&*()_+\-=\[\]{}|;:,.<>?]'
if not re.search(special_pattern, password):
errors.append("密码必须包含特殊字符")
# 连续字符检查
consecutive_count = 1
for i in range(1, len(password)):
if password[i] == password[i-1]:
consecutive_count += 1
if consecutive_count >= policy['max_consecutive_chars']:
errors.append(f"密码不能包含超过{policy['max_consecutive_chars']}个连续相同字符")
break
else:
consecutive_count = 1
# 用户信息相似性检查
if user:
user_info = [
user.username.lower() if user.username else '',
user.first_name.lower() if user.first_name else '',
user.last_name.lower() if user.last_name else '',
user.email.lower() if user.email else ''
]
password_lower = password.lower()
for info in user_info:
if info and info in password_lower:
errors.append("密码不能包含您的个人信息")
# 常见密码检查
common_patterns = [
r'(.)\1{2,}', # 三个或更多相同字符
r'123456|abcdef', # 常见序列
r'(?:012|123|234|345|456|567|678|789|890)', # 连续数字
]
for pattern in common_patterns:
if re.search(pattern, password_lower):
errors.append("密码包含常见的弱密码模式")
if errors:
raise ValidationError(errors)
return True
# 使用示例
def password_security_example():
"""密码安全示例"""
security = PasswordSecurity()
policy_manager = PasswordPolicyManager()
# 生成安全密码
secure_password = security.generate_secure_password()
print(f"生成的安全密码: {secure_password}")
# 检查密码强度
strength_result = security.check_password_strength(secure_password)
print(f"密码强度: {strength_result}")
# 验证密码合规性
try:
policy_manager.validate_password_compliance(secure_password)
print("密码符合策略要求")
except ValidationError as e:
print(f"密码不符合策略: {e}")
# 哈希密码
hashed = security.hash_password_with_salt(secure_password)
print(f"哈希后的密码: {hashed[:20]}...")
# 验证密码
is_valid = security.verify_password_hash(secure_password, hashed)
print(f"密码验证结果: {is_valid}")
return secure_password#会话安全
# 会话安全实践
import secrets
import hashlib
import hmac
from django.contrib.sessions.models import Session
from django.utils import timezone
from datetime import timedelta
class SessionSecurity:
"""会话安全"""
@staticmethod
def generate_secure_session_key():
"""生成安全的会话密钥"""
return secrets.token_urlsafe(32)
@staticmethod
def create_session_fingerprint(request):
"""创建会话指纹"""
user_agent = request.META.get('HTTP_USER_AGENT', '')
ip_address = request.META.get('REMOTE_ADDR', '')
accept_language = request.META.get('HTTP_ACCEPT_LANGUAGE', '')
accept_encoding = request.META.get('HTTP_ACCEPT_ENCODING', '')
fingerprint_data = f"{user_agent}|{ip_address}|{accept_language}|{accept_encoding}"
return hashlib.sha256(fingerprint_data.encode()).hexdigest()
@staticmethod
def validate_session_fingerprint(request, stored_fingerprint):
"""验证会话指纹"""
current_fingerprint = SessionSecurity.create_session_fingerprint(request)
return hmac.compare_digest(current_fingerprint, stored_fingerprint)
@staticmethod
def implement_session_timeout(session, timeout_minutes=30):
"""实现会话超时"""
session.set_expiry(timeout_minutes * 60) # 转换为秒
@staticmethod
def rotate_session_key(request):
"""轮换会话密钥(防会话固定攻击)"""
request.session.cycle_key()
@staticmethod
def validate_session_ip_binding(session, current_ip):
"""验证会话IP绑定"""
stored_ip = session.get('original_ip')
if stored_ip:
return stored_ip == current_ip
return True # 首次访问,设置IP绑定
@staticmethod
def validate_session_ua_binding(session, current_ua):
"""验证会话User-Agent绑定"""
stored_ua = session.get('original_user_agent')
if stored_ua:
return stored_ua == current_ua
return True # 首次访问,设置UA绑定
# 会话安全中间件
class SecureSessionMiddleware:
"""安全会话中间件"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
if hasattr(request, 'session') and request.session.session_key:
# 验证会话安全
if not self.validate_session_security(request):
# 会话不安全,清除会话
request.session.flush()
response = self.handle_unsafe_session(request)
return response
def validate_session_security(self, request):
"""验证会话安全性"""
if not request.session.session_key:
return True
# 验证IP绑定
current_ip = request.META.get('REMOTE_ADDR')
if not SessionSecurity.validate_session_ip_binding(request.session, current_ip):
return False
# 验证User-Agent绑定
current_ua = request.META.get('HTTP_USER_AGENT', '')
if not SessionSecurity.validate_session_ua_binding(request.session, current_ua):
return False
# 验证会话指纹
if 'session_fingerprint' in request.session:
if not SessionSecurity.validate_session_fingerprint(
request, request.session['session_fingerprint']
):
return False
return True
def handle_unsafe_session(self, request):
"""处理不安全会话"""
from django.http import HttpResponse
return HttpResponse(
"会话安全验证失败",
status=403
)
# 会话监控和限制
class SessionMonitor:
"""会话监控"""
@staticmethod
def limit_sessions_per_user(user, max_sessions=5):
"""限制用户会话数量"""
active_sessions = Session.objects.filter(
session_data__contains=str(user.id),
expire_date__gt=timezone.now()
)
if active_sessions.count() > max_sessions:
# 终止最早的会话
sessions_to_delete = active_sessions.order_by('expire_date')[
max_sessions:
]
sessions_to_delete.delete()
@staticmethod
def detect_multiple_device_login(user, ip_address, max_devices=3):
"""检测多设备登录"""
# 获取用户最近的登录IP
recent_logins = UserLoginLog.objects.filter(
user=user,
login_time__gte=timezone.now() - timezone.timedelta(hours=24)
).values('ip_address').distinct()
unique_ips = set(login.ip_address for login in recent_logins)
if ip_address not in unique_ips and len(unique_ips) >= max_devices:
# 可能的异常登录
return True, f"检测到在第{len(unique_ips)+1}个设备上登录"
return False, ""
# CSRF保护增强
from django.middleware.csrf import CsrfViewMiddleware
class EnhancedCsrfMiddleware(CsrfViewMiddleware):
"""增强CSRF保护中间件"""
def process_view(self, request, callback, callback_args, callback_kwargs):
# 额外的CSRF检查
if self._is_post_request(request):
# 检查Referer头
referer = request.META.get('HTTP_REFERER')
if referer:
# 验证Referer是否来自可信域
if not self._is_valid_referer(referer, request):
return self._reject(request, "CSRF check failed: Invalid referer")
# 检查Origin头
origin = request.META.get('HTTP_ORIGIN')
if origin:
if not self._is_valid_origin(origin, request):
return self._reject(request, "CSRF check failed: Invalid origin")
return super().process_view(request, callback, callback_args, callback_kwargs)
def _is_post_request(self, request):
return request.method in ('POST', 'PUT', 'PATCH', 'DELETE')
def _is_valid_referer(self, referer, request):
# 实现Referer验证逻辑
from urllib.parse import urlparse
referer_parsed = urlparse(referer)
host_parsed = urlparse(request.build_absolute_uri())
# 检查主机名是否匹配
return referer_parsed.hostname == host_parsed.hostname
def _is_valid_origin(self, origin, request):
# 实现Origin验证逻辑
from urllib.parse import urlparse
origin_parsed = urlparse(origin)
host_parsed = urlparse(request.build_absolute_uri())
return origin_parsed.hostname == host_parsed.hostname
# 使用示例
def session_security_example(request):
"""会话安全示例"""
# 创建安全会话
session_security = SessionSecurity()
# 在用户登录时
if request.user.is_authenticated:
# 轮换会话密钥(防会话固定)
session_security.rotate_session_key(request)
# 设置会话指纹
fingerprint = session_security.create_session_fingerprint(request)
request.session['session_fingerprint'] = fingerprint
# 设置IP和UA绑定
request.session['original_ip'] = request.META.get('REMOTE_ADDR')
request.session['original_user_agent'] = request.META.get('HTTP_USER_AGENT', '')
# 设置会话超时
session_security.implement_session_timeout(request.session, 30)
# 限制会话数量
SessionMonitor.limit_sessions_per_user(request.user, max_sessions=3)
# 检测多设备登录
is_suspicious, message = SessionMonitor.detect_multiple_device_login(
request.user,
request.META.get('REMOTE_ADDR'),
max_devices=2
)
if is_suspicious:
print(f"安全警告: {message}")
return request.session#账户安全
# 账户安全实践
import time
import ipaddress
from django.core.cache import cache
from django.contrib.auth.models import User
class AccountSecurity:
"""账户安全"""
def __init__(self):
self.brute_force_protection = BruteForceProtection()
self.account_lockout = AccountLockoutSystem()
self.two_factor_auth = TwoFactorAuthentication()
def secure_login_attempt(self, username, password, request):
"""安全登录尝试"""
ip_address = request.META.get('REMOTE_ADDR')
user_agent = request.META.get('HTTP_USER_AGENT', '')
# 检查暴力破解
if self.brute_force_protection.is_blocked(username, ip_address):
return {
'success': False,
'error': '账户或IP已被暂时封禁',
'blocked': True
}
# 尝试认证
user = authenticate(request, username=username, password=password)
if user:
# 登录成功
self.brute_force_protection.record_successful_login(username, ip_address)
# 检查账户状态
if not user.is_active:
return {
'success': False,
'error': '账户已被禁用'
}
# 检查账户锁定
if self.account_lockout.is_locked(user):
return {
'success': False,
'error': '账户已被锁定'
}
# 检查是否需要二次验证
if self.two_factor_auth.is_required(user):
# 这里应该重定向到二次验证页面
request.session['pending_2fa_user_id'] = user.id
return {
'success': False,
'requires_2fa': True,
'user_id': user.id
}
return {
'success': True,
'user': user
}
else:
# 登录失败,记录尝试
self.brute_force_protection.record_failed_attempt(
username, ip_address, user_agent
)
# 检查是否触发锁定
if self.brute_force_protection.should_lock_account(username, ip_address):
self.account_lockout.lock_account(
username,
reason='多次登录失败'
)
return {
'success': False,
'error': '用户名或密码错误'
}
# 暴力破解防护
class BruteForceProtection:
"""暴力破解防护"""
def __init__(self):
self.max_attempts_per_ip = getattr(settings, 'MAX_LOGIN_ATTEMPTS_PER_IP', 10)
self.max_attempts_per_user = getattr(settings, 'MAX_LOGIN_ATTEMPTS_PER_USER', 5)
self.block_duration = getattr(settings, 'BLOCK_DURATION_SECONDS', 900) # 15分钟
self.time_window = getattr(settings, 'ATTEMPT_TIME_WINDOW', 900) # 15分钟
def get_cache_keys(self, username, ip_address):
"""获取缓存键"""
return {
'ip_attempts': f"login_attempts_ip:{ip_address}",
'user_attempts': f"login_attempts_user:{username}",
'ip_block': f"login_block_ip:{ip_address}",
'user_block': f"login_block_user:{username}"
}
def is_blocked(self, username, ip_address):
"""检查是否被阻止"""
keys = self.get_cache_keys(username, ip_address)
# 检查IP是否被阻止
if cache.get(keys['ip_block']):
return True
# 检查用户是否被阻止
if cache.get(keys['user_block']):
return True
return False
def record_failed_attempt(self, username, ip_address, user_agent):
"""记录失败尝试"""
keys = self.get_cache_keys(username, ip_address)
# 增加IP尝试次数
current_ip_attempts = cache.get(keys['ip_attempts'], 0)
cache.set(keys['ip_attempts'], current_ip_attempts + 1, timeout=self.time_window)
# 增加用户尝试次数
current_user_attempts = cache.get(keys['user_attempts'], 0)
cache.set(keys['user_attempts'], current_user_attempts + 1, timeout=self.time_window)
# 记录失败尝试日志
UserFailedLoginAttempt.objects.create(
username=username,
ip_address=ip_address,
user_agent=user_agent
)
def record_successful_login(self, username, ip_address):
"""记录成功登录"""
keys = self.get_cache_keys(username, ip_address)
# 清除相关的尝试计数
cache.delete(keys['ip_attempts'])
cache.delete(keys['user_attempts'])
# 清除阻止状态
cache.delete(keys['ip_block'])
cache.delete(keys['user_block'])
def should_lock_account(self, username, ip_address):
"""检查是否应该锁定账户"""
keys = self.get_cache_keys(username, ip_address)
# 检查IP尝试次数
ip_attempts = cache.get(keys['ip_attempts'], 0)
if ip_attempts >= self.max_attempts_per_ip:
cache.set(keys['ip_block'], True, timeout=self.block_duration)
return True
# 检查用户尝试次数
user_attempts = cache.get(keys['user_attempts'], 0)
if user_attempts >= self.max_attempts_per_user:
cache.set(keys['user_block'], True, timeout=self.block_duration)
return True
return False
# 账户锁定系统
class AccountLockoutSystem:
"""账户锁定系统"""
def __init__(self):
self.lock_duration = getattr(settings, 'ACCOUNT_LOCKOUT_DURATION', 1800) # 30分钟
def lock_account(self, username, reason='', locked_by=None):
"""锁定账户"""
try:
user = User.objects.get(username=username)
user.is_active = False
user.save()
# 记录锁定日志
AccountLockLog.objects.create(
user=user,
reason=reason,
locked_by=locked_by,
lock_type='automatic' # automatic, manual
)
return True
except User.DoesNotExist:
return False
def unlock_account(self, username, unlocked_by=None):
"""解锁账户"""
try:
user = User.objects.get(username=username)
user.is_active = True
user.save()
# 记录解锁日志
AccountUnlockLog.objects.create(
user=user,
unlocked_by=unlocked_by
)
return True
except User.DoesNotExist:
return False
def is_locked(self, user):
"""检查账户是否被锁定"""
if not user.is_active:
# 检查是否有解锁条件
# 例如:检查锁定时间是否已过期
last_lock = AccountLockLog.objects.filter(
user=user
).order_by('-timestamp').first()
if last_lock:
from datetime import timedelta
if timezone.now() - last_lock.timestamp > timedelta(seconds=self.lock_duration):
# 自动解锁
self.unlock_account(user.username)
return False
return not user.is_active
# 账户锁定日志
class AccountLockLog(models.Model):
"""账户锁定日志"""
user = models.ForeignKey(User, on_delete=models.CASCADE)
reason = models.TextField()
locked_by = models.ForeignKey(
User,
on_delete=models.SET_NULL,
null=True,
related_name='locked_accounts'
)
lock_type = models.CharField(max_length=20, choices=[
('automatic', '自动'),
('manual', '手动')
])
timestamp = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'account_lock_log'
class AccountUnlockLog(models.Model):
"""账户解锁日志"""
user = models.ForeignKey(User, on_delete=models.CASCADE)
unlocked_by = models.ForeignKey(
User,
on_delete=models.SET_NULL,
null=True,
related_name='unlocked_accounts'
)
timestamp = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'account_unlock_log'
# 双因素认证
class TwoFactorAuthentication:
"""双因素认证"""
def __init__(self):
self.totp_service = TOTPService()
self.sms_service = SMSService() # 假设有一个SMS服务类
def is_required(self, user):
"""检查是否需要二次验证"""
if hasattr(user, 'profile') and user.profile.enable_2fa:
return True
return False
def generate_2fa_setup_info(self, user, method='totp'):
"""生成二次验证设置信息"""
if method == 'totp':
secret = self.totp_service.generate_secret()
qr_code, uri = self.totp_service.generate_qr_code(user)
# 临时存储secret,等待用户验证
cache_key = f"2fa_setup_{user.id}"
cache.set(cache_key, secret, timeout=300) # 5分钟过期
return {
'method': 'totp',
'secret': secret,
'qr_code': qr_code,
'uri': uri
}
elif method == 'sms':
# SMS设置逻辑
pass
def verify_2fa_setup(self, user, token):
"""验证二次验证设置"""
cache_key = f"2fa_setup_{user.id}"
stored_secret = cache.get(cache_key)
if stored_secret:
is_valid = self.totp_service.verify_code(stored_secret, token)
if is_valid:
# 保存到用户配置文件
if hasattr(user, 'profile'):
user.profile.totp_secret = stored_secret
user.profile.enable_2fa = True
user.profile.save()
cache.delete(cache_key)
return True
return False
def verify_2fa_login(self, user_id, token):
"""验证登录时的二次验证"""
try:
user = User.objects.get(id=user_id)
if hasattr(user, 'profile') and user.profile.totp_secret:
return self.totp_service.verify_code(user.profile.totp_secret, token)
except User.DoesNotExist:
pass
return False
def disable_2fa(self, user):
"""禁用二次验证"""
if hasattr(user, 'profile'):
user.profile.enable_2fa = False
user.profile.totp_secret = ''
user.profile.save()
# SMS服务类(示例)
class SMSService:
"""短信服务"""
def send_verification_code(self, phone_number):
"""发送验证码"""
import random
code = f"{random.randint(100000, 999999)}"
# 这里应该是实际的短信发送逻辑
# 例如:调用短信网关API
# 存储验证码(实际应用中应该加密存储)
cache_key = f"sms_code_{phone_number}"
cache.set(cache_key, code, timeout=300) # 5分钟过期
return code
def verify_code(self, phone_number, code):
"""验证短信验证码"""
cache_key = f"sms_code_{phone_number}"
stored_code = cache.get(cache_key)
if stored_code and stored_code == code:
cache.delete(cache_key) # 验证成功后删除
return True
return False
# 账户安全监控
class AccountSecurityMonitor:
"""账户安全监控"""
@staticmethod
def detect_suspicious_activity(user, request):
"""检测可疑活动"""
events = []
# 检查IP地理位置变化
current_ip = request.META.get('REMOTE_ADDR')
if hasattr(user, 'profile') and user.profile.last_login_ip:
if not AccountSecurityMonitor.is_same_location(
user.profile.last_login_ip,
current_ip
):
events.append({
'type': 'location_change',
'message': f'登录位置发生变化: {current_ip}',
'severity': 'high'
})
# 检查登录时间模式
recent_logins = UserLoginLog.objects.filter(
user=user,
login_time__gte=timezone.now() - timezone.timedelta(days=7)
).order_by('-login_time')
if recent_logins.count() > 10:
events.append({
'type': 'high_frequency_login',
'message': '短时间内登录频繁',
'severity': 'medium'
})
# 检查设备指纹变化
current_ua = request.META.get('HTTP_USER_AGENT', '')
if hasattr(user, 'profile') and user.profile.last_user_agent:
if not AccountSecurityMonitor.is_similar_ua(
user.profile.last_user_agent,
current_ua
):
events.append({
'type': 'device_change',
'message': '登录设备发生变化',
'severity': 'low'
})
return events
@staticmethod
def is_same_location(ip1, ip2):
"""检查IP是否在同一地理位置(简化版)"""
# 实际应用中应该使用IP地理位置服务
# 这里只是简单比较IP段
try:
ip1_obj = ipaddress.IPv4Address(ip1)
ip2_obj = ipaddress.IPv4Address(ip2)
# 比较前两个八位字节
return str(ip1_obj).split('.')[:2] == str(ip2_obj).split('.')[:2]
except:
return ip1.split('.')[:2] == ip2.split('.')[:2]
@staticmethod
def is_similar_ua(ua1, ua2):
"""检查User-Agent是否相似"""
# 简单比较主要特征
import re
browser1 = re.search(r'(Chrome|Firefox|Safari|Edge)', ua1, re.I)
browser2 = re.search(r'(Chrome|Firefox|Safari|Edge)', ua2, re.I)
if browser1 and browser2:
return browser1.group().lower() == browser2.group().lower()
return ua1[:50] == ua2[:50] # 比较前50个字符
# 使用示例
def account_security_example(request):
"""账户安全示例"""
security = AccountSecurity()
if request.method == 'POST':
username = request.POST.get('username')
password = request.POST.get('password')
result = security.secure_login_attempt(username, password, request)
if result['success']:
# 登录成功
login(request, result['user'])
# 更新最后登录信息
if hasattr(result['user'], 'profile'):
result['user'].profile.last_login_ip = request.META.get('REMOTE_ADDR')
result['user'].profile.last_user_agent = request.META.get('HTTP_USER_AGENT', '')
result['user'].profile.save()
return redirect('dashboard')
elif result.get('requires_2fa'):
# 需要二次验证
return redirect('verify_2fa', user_id=result['user_id'])
else:
# 登录失败
messages.error(request, result['error'])
return render(request, 'login.html')
# 账户安全配置
"""
# settings.py 安全配置示例
SECURE_LOGIN_CONFIG = {
'MAX_LOGIN_ATTEMPTS_PER_IP': 10,
'MAX_LOGIN_ATTEMPTS_PER_USER': 5,
'BLOCK_DURATION_SECONDS': 900, # 15分钟
'ATTEMPT_TIME_WINDOW': 900, # 15分钟
'ACCOUNT_LOCKOUT_DURATION': 1800, # 30分钟
'PASSWORD_MIN_LENGTH': 12,
'PASSWORD_REQUIRE_UPPERCASE': True,
'PASSWORD_REQUIRE_LOWERCASE': True,
'PASSWORD_REQUIRE_NUMBERS': True,
'PASSWORD_REQUIRE_SPECIAL': True,
'PASSWORD_HISTORY_CHECK_COUNT': 5,
'PASSWORD_EXPIRY_DAYS': 90,
}
"""#第三方认证集成
#OAuth2集成
# OAuth2认证集成
import requests
import json
from urllib.parse import urlencode, parse_qs
from django.contrib.auth import login
from django.shortcuts import redirect
from django.conf import settings
class OAuth2Provider:
"""OAuth2提供者基类"""
def __init__(self, client_id, client_secret, redirect_uri, **kwargs):
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.authorization_url = kwargs.get('authorization_url')
self.token_url = kwargs.get('token_url')
self.user_info_url = kwargs.get('user_info_url')
self.scope = kwargs.get('scope', [])
def get_authorization_url(self, state=None):
"""获取授权URL"""
params = {
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'response_type': 'code',
'scope': ' '.join(self.scope),
}
if state:
params['state'] = state
return f"{self.authorization_url}?{urlencode(params)}"
def get_access_token(self, code):
"""获取访问令牌"""
data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
'grant_type': 'authorization_code',
'redirect_uri': self.redirect_uri,
}
response = requests.post(self.token_url, data=data)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"获取访问令牌失败: {response.text}")
def get_user_info(self, access_token):
"""获取用户信息"""
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get(self.user_info_url, headers=headers)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"获取用户信息失败: {response.text}")
# GitHub OAuth2集成
class GitHubOAuth2(OAuth2Provider):
"""GitHub OAuth2集成"""
def __init__(self):
super().__init__(
client_id=settings.GITHUB_CLIENT_ID,
client_secret=settings.GITHUB_CLIENT_SECRET,
redirect_uri=settings.GITHUB_REDIRECT_URI,
authorization_url='https://github.com/login/oauth/authorize',
token_url='https://github.com/login/oauth/access_token',
user_info_url='https://api.github.com/user',
scope=['user:email']
)
def get_user_info(self, access_token):
"""获取GitHub用户信息"""
user_data = super().get_user_info(access_token)
# 获取邮箱信息
emails_url = 'https://api.github.com/user/emails'
headers = {'Authorization': f'Bearer {access_token}'}
emails_response = requests.get(emails_url, headers=headers)
if emails_response.status_code == 200:
emails = emails_response.json()
primary_email = next(
(email['email'] for email in emails if email['primary']),
user_data.get('email', '')
)
else:
primary_email = user_data.get('email', '')
return {
'id': user_data['id'],
'username': user_data['login'],
'email': primary_email,
'name': user_data.get('name', ''),
'avatar': user_data.get('avatar_url', ''),
'bio': user_data.get('bio', ''),
'company': user_data.get('company', ''),
'location': user_data.get('location', ''),
}
# Google OAuth2集成
class GoogleOAuth2(OAuth2Provider):
"""Google OAuth2集成"""
def __init__(self):
super().__init__(
client_id=settings.GOOGLE_CLIENT_ID,
client_secret=settings.GOOGLE_CLIENT_SECRET,
redirect_uri=settings.GOOGLE_REDIRECT_URI,
authorization_url='https://accounts.google.com/o/oauth2/auth',
token_url='https://oauth2.googleapis.com/token',
user_info_url='https://www.googleapis.com/oauth2/v2/userinfo',
scope=['https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/userinfo.profile']
)
def get_user_info(self, access_token):
"""获取Google用户信息"""
user_data = super().get_user_info(access_token)
return {
'id': user_data['id'],
'email': user_data['email'],
'verified_email': user_data.get('verified_email', False),
'name': user_data.get('name', ''),
'given_name': user_data.get('given_name', ''),
'family_name': user_data.get('family_name', ''),
'picture': user_data.get('picture', ''),
'locale': user_data.get('locale', 'en'),
}
# 微信OAuth2集成
class WeChatOAuth2(OAuth2Provider):
"""微信OAuth2集成"""
def __init__(self):
super().__init__(
client_id=settings.WECHAT_APP_ID,
client_secret=settings.WECHAT_APP_SECRET,
redirect_uri=settings.WECHAT_REDIRECT_URI,
authorization_url='https://open.weixin.qq.com/connect/qrconnect',
token_url='https://api.weixin.qq.com/sns/oauth2/access_token',
user_info_url='https://api.weixin.qq.com/sns/userinfo',
scope=['snsapi_login', 'snsapi_userinfo']
)
def get_authorization_url(self, state=None):
"""获取微信授权URL"""
params = {
'appid': self.client_id,
'redirect_uri': self.redirect_uri,
'response_type': 'code',
'scope': ','.join(self.scope),
}
if state:
params['state'] = state
return f"{self.authorization_url}?{urlencode(params)}#wechat_redirect"
def get_access_token(self, code):
"""获取微信访问令牌"""
params = {
'appid': self.client_id,
'secret': self.client_secret,
'code': code,
'grant_type': 'authorization_code'
}
response = requests.get(self.token_url, params=params)
if response.status_code == 200:
token_data = response.json()
if 'errcode' in token_data:
raise Exception(f"WeChat OAuth error: {token_data}")
return token_data
else:
raise Exception(f"获取访问令牌失败: {response.text}")
def get_user_info(self, access_token, openid):
"""获取微信用户信息"""
params = {
'access_token': access_token,
'openid': openid,
'lang': 'zh_CN'
}
response = requests.get(self.user_info_url, params=params)
if response.status_code == 200:
user_data = response.json()
if 'errcode' in user_data:
raise Exception(f"WeChat user info error: {user_data}")
return user_data
else:
raise Exception(f"获取用户信息失败: {response.text}")
# OAuth2认证管理器
class OAuth2AuthManager:
"""OAuth2认证管理器"""
def __init__(self):
self.providers = {
'github': GitHubOAuth2,
'google': GoogleOAuth2,
'wechat': WeChatOAuth2,
}
def get_provider(self, provider_name):
"""获取认证提供者"""
if provider_name in self.providers:
return self.providers[provider_name]()
raise ValueError(f"不支持的OAuth2提供者: {provider_name}")
def initiate_auth(self, request, provider_name, next_url=None):
"""发起认证"""
provider = self.get_provider(provider_name)
# 生成state参数用于CSRF保护
import secrets
state = secrets.token_urlsafe(32)
# 存储state和next_url到session
request.session[f'oauth2_state_{provider_name}'] = state
if next_url:
request.session[f'oauth2_next_{provider_name}'] = next_url
auth_url = provider.get_authorization_url(state=state)
return redirect(auth_url)
def handle_callback(self, request, provider_name):
"""处理回调"""
# 验证state参数
state = request.GET.get('state')
stored_state = request.session.get(f'oauth2_state_{provider_name}')
if not state or state != stored_state:
raise ValueError("无效的state参数")
# 获取授权码
code = request.GET.get('code')
if not code:
raise ValueError("未收到授权码")
# 获取访问令牌
provider = self.get_provider(provider_name)
token_data = provider.get_access_token(code)
access_token = token_data['access_token']
# 获取用户信息
if provider_name == 'wechat':
openid = token_data['openid']
user_info = provider.get_user_info(access_token, openid)
else:
user_info = provider.get_user_info(access_token)
# 创建或获取用户
user = self.get_or_create_social_user(provider_name, user_info)
# 登录用户
login(request, user)
# 清除session数据
request.session.pop(f'oauth2_state_{provider_name}', None)
next_url = request.session.pop(f'oauth2_next_{provider_name}', '/')
return redirect(next_url)
def get_or_create_social_user(self, provider_name, user_info):
"""获取或创建社交用户"""
# 构造外部用户ID
external_id = f"{provider_name}_{user_info['id']}"
# 尝试获取现有用户
social_user, created = SocialUser.objects.get_or_create(
external_id=external_id,
provider=provider_name,
defaults={
'external_data': json.dumps(user_info),
}
)
# 获取或创建本地用户
if created:
# 新的社交用户,创建本地账户
local_user = self.create_local_user(provider_name, user_info)
social_user.local_user = local_user
social_user.save()
else:
# 现有的社交用户
local_user = social_user.local_user
# 更新用户信息
self.update_user_from_social_data(local_user, user_info)
return local_user
def create_local_user(self, provider_name, user_info):
"""创建本地用户"""
# 生成唯一用户名
base_username = user_info.get('username') or user_info.get('name') or f"{provider_name}_user"
username = self.generate_unique_username(base_username)
# 获取邮箱
email = user_info.get('email', '')
if email:
# 验证邮箱唯一性
if User.objects.filter(email=email).exists():
email = '' # 如果邮箱已被使用,则不设置
# 创建用户
local_user = CustomUser.objects.create_user(
username=username,
email=email,
password=None, # OAuth用户不需要本地密码
)
# 设置用户信息
local_user.first_name = user_info.get('given_name', '')[:30]
local_user.last_name = user_info.get('family_name', '')[:150]
if not local_user.first_name and not local_user.last_name:
name = user_info.get('name', '')
if name:
parts = name.split(' ', 1)
local_user.first_name = parts[0][:30]
if len(parts) > 1:
local_user.last_name = parts[1][:150]
# 如果有头像,可以下载并保存
avatar_url = user_info.get('avatar') or user_info.get('picture')
if avatar_url and hasattr(local_user, 'profile'):
# 这里可以下载头像并保存到用户资料中
pass
local_user.is_verified = True # 社交账号通常被认为是验证过的
local_user.save()
return local_user
def update_user_from_social_data(self, user, user_info):
"""从社交数据更新用户信息"""
updated = False
# 更新姓名
first_name = user_info.get('given_name', '')[:30]
last_name = user_info.get('family_name', '')[:150]
if first_name and user.first_name != first_name:
user.first_name = first_name
updated = True
if last_name and user.last_name != last_name:
user.last_name = last_name
updated = True
# 如果之前没有姓名,可以从name字段获取
if not user.first_name and not user.last_name:
name = user_info.get('name', '')
if name:
parts = name.split(' ', 1)
user.first_name = parts[0][:30]
if len(parts) > 1:
user.last_name = parts[1][:150]
updated = True
# 更新邮箱(如果用户没有邮箱且社交账号提供了验证过的邮箱)
if not user.email and user_info.get('email') and user_info.get('verified_email', True):
user.email = user_info['email']
updated = True
if updated:
user.save()
def generate_unique_username(self, base_username):
"""生成唯一用户名"""
username = base_username
counter = 1
while CustomUser.objects.filter(username=username).exists():
username = f"{base_username}{counter}"
counter += 1
return username
# 社交用户模型
class SocialUser(models.Model):
"""社交用户"""
PROVIDER_CHOICES = [
('github', 'GitHub'),
('google', 'Google'),
('wechat', 'WeChat'),
('facebook', 'Facebook'),
('twitter', 'Twitter'),
]
local_user = models.ForeignKey(
CustomUser,
on_delete=models.CASCADE,
related_name='social_accounts'
)
provider = models.CharField(max_length=20, choices=PROVIDER_CHOICES)
external_id = models.CharField(max_length=100)
external_data = models.TextField() # JSON格式存储外部用户数据
access_token = models.TextField(blank=True)
refresh_token = models.TextField(blank=True)
token_expires_at = models.DateTimeField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'social_user'
unique_together = ['provider', 'external_id']
def get_external_data(self):
"""获取外部数据"""
import json
try:
return json.loads(self.external_data)
except json.JSONDecodeError:
return {}
# OAuth2视图示例
def oauth_login(request, provider):
"""OAuth2登录视图"""
manager = OAuth2AuthManager()
next_url = request.GET.get('next', '/')
return manager.initiate_auth(request, provider, next_url)
def oauth_callback(request, provider):
"""OAuth2回调视图"""
manager = OAuth2AuthManager()
try:
return manager.handle_callback(request, provider)
except Exception as e:
messages.error(request, f"认证失败: {str(e)}")
return redirect('login')
# OAuth2 URLs配置示例
"""
# urls.py
from django.urls import path
from . import views
urlpatterns = [
path('oauth/login/<str:provider>/', views.oauth_login, name='oauth_login'),
path('oauth/callback/<str:provider>/', views.oauth_callback, name='oauth_callback'),
]
"""#JWT认证集成
# JWT认证集成
import jwt
from django.conf import settings
from django.contrib.auth import get_user_model
from django.utils import timezone
from datetime import datetime, timedelta
User = get_user_model()
class JWTAuthManager:
"""JWT认证管理器"""
def __init__(self):
self.secret_key = settings.SECRET_KEY
self.algorithm = 'HS256'
self.access_token_lifetime = timedelta(
minutes=getattr(settings, 'JWT_ACCESS_TOKEN_LIFETIME', 15)
)
self.refresh_token_lifetime = timedelta(
days=getattr(settings, 'JWT_REFRESH_TOKEN_LIFETIME', 7)
)
def generate_tokens(self, user):
"""生成访问令牌和刷新令牌"""
access_token = self._generate_access_token(user)
refresh_token = self._generate_refresh_token(user)
return {
'access': access_token,
'refresh': refresh_token,
'access_expires_in': self.access_token_lifetime.total_seconds(),
'refresh_expires_in': self.refresh_token_lifetime.total_seconds(),
}
def _generate_access_token(self, user):
"""生成访问令牌"""
payload = {
'user_id': user.id,
'username': user.username,
'email': user.email,
'exp': timezone.now() + self.access_token_lifetime,
'iat': timezone.now(),
'token_type': 'access'
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def _generate_refresh_token(self, user):
"""生成刷新令牌"""
payload = {
'user_id': user.id,
'exp': timezone.now() + self.refresh_token_lifetime,
'iat': timezone.now(),
'token_type': 'refresh'
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_access_token(self, token):
"""验证访问令牌"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
if payload['token_type'] != 'access':
raise jwt.InvalidTokenError("Invalid token type")
user_id = payload['user_id']
user = User.objects.get(id=user_id)
return user, True
except jwt.ExpiredSignatureError:
return None, False
except jwt.InvalidTokenError:
return None, False
except User.DoesNotExist:
return None, False
def verify_refresh_token(self, token):
"""验证刷新令牌"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
if payload['token_type'] != 'refresh':
raise jwt.InvalidTokenError("Invalid token type")
user_id = payload['user_id']
user = User.objects.get(id=user_id)
return user, True
except jwt.ExpiredSignatureError:
return None, False
except jwt.InvalidTokenError:
return None, False
except User.DoesNotExist:
return None, False
def refresh_access_token(self, refresh_token):
"""使用刷新令牌获取新的访问令牌"""
user, valid = self.verify_refresh_token(refresh_token)
if valid and user:
return self._generate_access_token(user)
return None
# JWT认证后端
class JWTAuthenticationBackend:
"""JWT认证后端"""
def __init__(self):
self.jwt_manager = JWTAuthManager()
def authenticate(self, request):
"""从请求中认证用户"""
token = self.get_token_from_request(request)
if token:
user, valid = self.jwt_manager.verify_access_token(token)
if valid and user:
return user
return None
def get_token_from_request(self, request):
"""从请求中获取JWT令牌"""
# 首先检查Authorization头
auth_header = request.META.get('HTTP_AUTHORIZATION')
if auth_header and auth_header.startswith('Bearer '):
return auth_header.split(' ')[1]
# 然后检查URL参数
token = request.GET.get('token')
if token:
return token
# 最后检查Cookie
token = request.COOKIES.get('jwt_token')
if token:
return token
return None
def get_user(self, user_id):
"""获取用户"""
try:
return User.objects.get(pk=user_id)
except User.DoesNotExist:
return None
# JWT中间件
class JWTMiddleware:
"""JWT中间件"""
def __init__(self, get_response):
self.get_response = get_response
self.auth_backend = JWTAuthenticationBackend()
def __call__(self, request):
# 尝试JWT认证
user = self.auth_backend.authenticate(request)
if user:
request.user = user
response = self.get_response(request)
return response
# JWT视图示例
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.views import View
import json
@method_decorator(csrf_exempt, name='dispatch')
class TokenObtainPairView(View):
"""获取令牌对视图"""
def post(self, request):
try:
data = json.loads(request.body)
username = data.get('username')
password = data.get('password')
if not username or not password:
return JsonResponse({
'error': '用户名和密码都是必需的'
}, status=400)
user = authenticate(request, username=username, password=password)
if user and user.is_active:
jwt_manager = JWTAuthManager()
tokens = jwt_manager.generate_tokens(user)
response = JsonResponse({
'tokens': tokens
})
# 可选:将JWT存储在HttpOnly cookie中
response.set_cookie(
'jwt_token',
tokens['access'],
httponly=True,
secure=settings.DEBUG == False, # 生产环境使用HTTPS
samesite='Lax'
)
return response
else:
return JsonResponse({
'error': '用户名或密码错误'
}, status=401)
except json.JSONDecodeError:
return JsonResponse({
'error': '无效的JSON数据'
}, status=400)
except Exception as e:
return JsonResponse({
'error': str(e)
}, status=500)
@method_decorator(csrf_exempt, name='dispatch')
class TokenRefreshView(View):
"""刷新令牌视图"""
def post(self, request):
try:
data = json.loads(request.body)
refresh_token = data.get('refresh')
if not refresh_token:
return JsonResponse({
'error': '刷新令牌是必需的'
}, status=400)
jwt_manager = JWTAuthManager()
new_access_token = jwt_manager.refresh_access_token(refresh_token)
if new_access_token:
response = JsonResponse({
'access': new_access_token
})
# 更新cookie中的访问令牌
response.set_cookie(
'jwt_token',
new_access_token,
httponly=True,
secure=settings.DEBUG == False,
samesite='Lax'
)
return response
else:
return JsonResponse({
'error': '无效或过期的刷新令牌'
}, status=401)
except json.JSONDecodeError:
return JsonResponse({
'error': '无效的JSON数据'
}, status=400)
except Exception as e:
return JsonResponse({
'error': str(e)
}, status=500)
# JWT保护的视图装饰器
def jwt_required(view_func):
"""JWT必需装饰器"""
@wraps(view_func)
def wrapper(request, *args, **kwargs):
jwt_backend = JWTAuthenticationBackend()
user = jwt_backend.authenticate(request)
if not user:
return JsonResponse({
'error': '认证失败'
}, status=401)
request.user = user
return view_func(request, *args, **kwargs)
return wrapper
# 使用示例
"""
# URLs配置
urlpatterns = [
path('api/token/', TokenObtainPairView.as_view(), name='token_obtain_pair'),
path('api/token/refresh/', TokenRefreshView.as_view(), name='token_refresh'),
path('api/protected/', jwt_required(ProtectedView.as_view()), name='protected_view'),
]
# 在settings.py中添加中间件
MIDDLEWARE = [
# ... 其他中间件
'your_app.middleware.JWTMiddleware',
]
"""#常见问题与解决方案
#问题1:密码重置功能
# 密码重置功能实现
from django.core.mail import send_mail
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.contrib.sites.shortcuts import get_current_site
from django.urls import reverse
import secrets
class PasswordResetHandler:
"""密码重置处理器"""
@staticmethod
def generate_reset_token(user):
"""生成重置令牌"""
# 删除旧的重置令牌
PasswordResetToken.objects.filter(user=user, is_used=False).delete()
token = secrets.token_urlsafe(32)
reset_token = PasswordResetToken.objects.create(
user=user,
token=token
)
return reset_token
@staticmethod
def send_reset_email(request, user):
"""发送重置邮件"""
reset_token = PasswordResetHandler.generate_reset_token(user)
# 构建重置链接
current_site = get_current_site(request)
reset_link = f"https://{current_site.domain}{reverse('password_reset_confirm', kwargs={'token': reset_token.token})}"
# 渲染邮件模板
subject = '重置您的密码'
html_message = render_to_string('registration/password_reset_email.html', {
'user': user,
'reset_link': reset_link,
'site_name': current_site.name,
})
plain_message = strip_tags(html_message)
# 发送邮件
send_mail(
subject,
plain_message,
settings.DEFAULT_FROM_EMAIL,
[user.email],
html_message=html_message,
)
@staticmethod
def validate_reset_token(token):
"""验证重置令牌"""
try:
reset_token = PasswordResetToken.objects.get(token=token)
# 检查令牌是否过期(1小时)
from django.utils import timezone
from datetime import timedelta
if reset_token.is_used:
return None, "令牌已被使用"
if timezone.now() - reset_token.created_at > timedelta(hours=1):
return None, "令牌已过期"
return reset_token.user, None
except PasswordResetToken.DoesNotExist:
return None, "无效的重置令牌"
@staticmethod
def reset_user_password(user, new_password):
"""重置用户密码"""
# 验证新密码
password_validator = PasswordPolicyManager()
password_validator.validate_password_compliance(new_password, user)
# 设置新密码
password_service = PasswordService()
password_service.set_password(user, new_password, check_history=True)
# 标记所有重置令牌为已使用
PasswordResetToken.objects.filter(user=user, is_used=False).update(is_used=True)
# 密码重置视图
def password_reset_request(request):
"""密码重置请求视图"""
if request.method == 'POST':
email = request.POST.get('email')
try:
user = User.objects.get(email=email)
PasswordResetHandler.send_reset_email(request, user)
messages.success(request, '密码重置链接已发送到您的邮箱')
return redirect('login')
except User.DoesNotExist:
messages.error(request, '该邮箱未注册')
return render(request, 'registration/password_reset_request.html')
def password_reset_confirm(request, token):
"""密码重置确认视图"""
user, error = PasswordResetHandler.validate_reset_token(token)
if error:
messages.error(request, error)
return redirect('password_reset_request')
if request.method == 'POST':
new_password = request.POST.get('new_password')
confirm_password = request.POST.get('confirm_password')
if new_password != confirm_password:
messages.error(request, '两次输入的密码不一致')
else:
try:
PasswordResetHandler.reset_user_password(user, new_password)
messages.success(request, '密码重置成功,请使用新密码登录')
return redirect('login')
except ValidationError as e:
for msg in e.messages:
messages.error(request, msg)
return render(request, 'registration/password_reset_confirm.html', {'token': token})#问题2:用户注册激活
# 用户注册激活功能
import secrets
from django.core.signing import TimestampSigner
from django.core.mail import send_mail
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.contrib.sites.shortcuts import get_current_site
from django.urls import reverse
class UserActivationHandler:
"""用户激活处理器"""
def __init__(self):
self.signer = TimestampSigner()
def generate_activation_token(self, user):
"""生成激活令牌"""
# 签名用户ID和时间戳
return self.signer.sign(f"{user.id}:{user.email}")
def verify_activation_token(self, token, max_age=60*60*24*7): # 7天
"""验证激活令牌"""
try:
# 解签名令牌
data = self.signer.unsign(token, max_age=max_age)
user_id, email = data.split(':', 1)
# 验证用户
user = User.objects.get(id=user_id, email=email)
if user.is_active:
return None, "账户已经激活"
return user, None
except Exception as e:
return None, str(e)
def send_activation_email(self, request, user):
"""发送激活邮件"""
activation_token = self.generate_activation_token(user)
# 构建激活链接
current_site = get_current_site(request)
activation_link = f"https://{current_site.domain}{reverse('activate_account', kwargs={'token': activation_token})}"
# 渲染邮件模板
subject = '激活您的账户'
html_message = render_to_string('registration/activation_email.html', {
'user': user,
'activation_link': activation_link,
'site_name': current_site.name,
})
plain_message = strip_tags(html_message)
# 发送邮件
send_mail(
subject,
plain_message,
settings.DEFAULT_FROM_EMAIL,
[user.email],
html_message=html_message,
)
# 用户注册视图(带激活)
def register_user(request):
"""用户注册视图"""
if request.method == 'POST':
username = request.POST.get('username')
email = request.POST.get('email')
password = request.POST.get('password')
try:
# 验证输入
validator = UserValidator()
validator.validate_username(username)
validator.validate_email(email)
# 检查唯一性
if User.objects.filter(username=username).exists():
raise ValidationError('用户名已存在')
if User.objects.filter(email=email).exists():
raise ValidationError('邮箱已被注册')
# 创建未激活用户
user = CustomUser.objects.create_user(
username=username,
email=email,
password=password,
is_active=False # 设为未激活
)
# 发送激活邮件
activation_handler = UserActivationHandler()
activation_handler.send_activation_email(request, user)
messages.success(request, '注册成功!请检查邮箱并点击激活链接完成注册')
return redirect('login')
except ValidationError as e:
for msg in e.messages:
messages.error(request, msg)
return render(request, 'registration/register.html')
def activate_account(request, token):
"""账户激活视图"""
activation_handler = UserActivationHandler()
user, error = activation_handler.verify_activation_token(token)
if error:
messages.error(request, f'激活失败: {error}')
return redirect('register')
# 激活用户
user.is_active = True
user.save()
messages.success(request, '账户激活成功!现在您可以登录了')
return redirect('login')#问题3:记住我功能
# 记住我功能实现
from django.contrib.auth import login
from django.contrib.sessions.models import Session
class RememberMeHandler:
"""记住我功能处理器"""
@staticmethod
def set_remember_me(request, user, remember_me):
"""设置记住我功能"""
if remember_me:
# 设置较长的会话过期时间(例如2周)
request.session.set_expiry(1209600) # 2周 = 14*24*60*60 秒
else:
# 浏览器关闭时过期
request.session.set_expiry(0)
@staticmethod
def create_persistent_session(request, user, device_fingerprint=None):
"""创建持久会话"""
# 登录用户
login(request, user)
# 设置会话选项
RememberMeHandler.set_remember_me(request, user, True)
# 存储设备指纹(可选)
if device_fingerprint:
request.session['device_fingerprint'] = device_fingerprint
# 记录持久会话信息
PersistentSession.objects.create(
user=user,
session_key=request.session.session_key,
device_fingerprint=device_fingerprint,
ip_address=request.META.get('REMOTE_ADDR'),
user_agent=request.META.get('HTTP_USER_AGENT', ''),
expires_at=timezone.now() + timedelta(days=14) # 2周
)
# 持久会话模型
class PersistentSession(models.Model):
"""持久会话"""
user = models.ForeignKey(User, on_delete=models.CASCADE)
session_key = models.CharField(max_length=40, unique=True)
device_fingerprint = models.CharField(max_length=255, blank=True)
ip_address = models.GenericIPAddressField()
user_agent = models.TextField()
created_at = models.DateTimeField(auto_now_add=True)
expires_at = models.DateTimeField()
is_active = models.BooleanField(default=True)
class Meta:
db_table = 'persistent_session'
indexes = [
models.Index(fields=['user', 'expires_at']),
models.Index(fields=['session_key']),
]
def is_valid(self):
"""检查会话是否有效"""
return self.is_active and self.expires_at > timezone.now()
# 记住我中间件
class RememberMeMiddleware:
"""记住我中间件"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
# 检查是否需要记住用户
if (hasattr(request, 'session') and
request.session.get('remember_me') and
hasattr(request, 'user') and
request.user.is_authenticated):
# 延长会话时间
request.session.set_expiry(1209600) # 2周
return response
# 更新登录视图以支持记住我
def enhanced_login_view(request):
"""增强登录视图(支持记住我)"""
if request.method == 'POST':
username = request.POST.get('username')
password = request.POST.get('password')
remember_me = request.POST.get('remember_me') == 'on'
user = authenticate(request, username=username, password=password)
if user and user.is_active:
# 检查账户安全
security = AccountSecurity()
security_result = security.secure_login_attempt(username, password, request)
if security_result['success']:
# 登录用户
login(request, user)
# 设置记住我
if remember_me:
request.session['remember_me'] = True
RememberMeHandler.set_remember_me(request, user, True)
# 创建持久会话
device_fingerprint = SessionSecurity.create_session_fingerprint(request)
RememberMeHandler.create_persistent_session(
request, user, device_fingerprint
)
else:
request.session['remember_me'] = False
RememberMeHandler.set_remember_me(request, user, False)
# 记录登录信息
UserSessionManager.record_login_info(user, request)
# 检查是否有重定向目标
next_url = request.POST.get('next') or request.GET.get('next', '/')
return redirect(next_url)
else:
messages.error(request, security_result['error'])
else:
messages.error(request, '用户名或密码错误')
return render(request, 'registration/login.html')#本章小结
在本章中,我们深入学习了Django用户认证系统:
- **

