#Django信号系统 - 事件驱动编程模式
📂 所属阶段:第二部分 — 进阶特性
🎯 难度等级:中级
⏰ 预计学习时间:3-4小时
🎒 前置知识:模型设计与ORM操作
#目录
#信号系统概念与工作原理
Django信号是一种事件通知框架,允许解耦的应用在发生特定事件时相互通讯。
#信号系统定义
"""
Django信号系统是观察者模式的实现,它允许某些发送者在发生特定事件时通知一组接收者。
信号系统组成:
1. 信号(Signal) - 事件的通知者
2. 发送者(Sender) - 触发信号的对象
3. 接收者(Receiver) - 响应信号的函数
4. 信号连接 - 将接收者连接到信号的机制
"""#信号基本结构
# 信号系统的基本使用模式
from django.dispatch import Signal, receiver
from django.db.models.signals import post_save, pre_save
from django.dispatch import receiver
# 自定义信号
custom_signal = Signal()
def signal_handler(sender, **kwargs):
"""信号处理器"""
print(f"信号来自: {sender}")
print(f"参数: {kwargs}")
# 连接信号处理器
custom_signal.connect(signal_handler)
# 发送信号
custom_signal.send(sender=None, message="Hello World")
# 使用装饰器连接信号
@receiver(custom_signal)
def decorated_handler(sender, **kwargs):
"""装饰器方式的信号处理器"""
print(f"装饰器处理器收到信号: {kwargs}")#信号执行流程
"""
信号执行流程:
1. 某个事件发生(如模型保存、删除等)
2. Django触发相应的内置信号或自定义信号
3. 所有连接到该信号的接收器被调用
4. 每个接收器按注册顺序执行
5. 信号处理完成后,继续执行原流程
注意:信号是同步执行的,会阻塞主线程
"""#Django信号架构
#信号系统组件
# Django信号系统的核心组件
from django.dispatch import Signal
from django.db.models.signals import (
pre_init, post_init,
pre_save, post_save,
pre_delete, post_delete,
m2m_changed,
pre_migrate, post_migrate
)
from django.core.signals import (
request_started, request_finished,
got_request_exception
)
from django.test.signals import setting_changed, template_rendered
# 信号的基本属性
"""
Signal类的主要属性:
- receivers: 存储所有连接的接收器
- name: 信号名称
- providing_args: 信号提供的参数(已废弃)
"""
# 信号发送方法
"""
signal.send(sender, **named) - 同步发送信号
signal.send_robust(sender, **named) - 健壮发送,捕获接收器异常
"""#信号连接机制
# 信号连接的多种方式
from django.dispatch import Signal, receiver
my_signal = Signal()
# 方式1: 直接连接
def handler1(sender, **kwargs):
print("Handler 1 called")
my_signal.connect(handler1)
# 方式2: 使用装饰器
@receiver(my_signal)
def handler2(sender, **kwargs):
print("Handler 2 called")
# 方式3: 连接到特定发送者
@receiver(my_signal, sender=str)
def handler3(sender, **kwargs):
print(f"Handler 3 called by {sender}")
# 方式4: 指定弱引用
my_signal.connect(handler1, weak=True)
# 方式5: 指定信号接收器的唯一标识
my_signal.connect(handler1, dispatch_uid="unique_handler")
# 断开连接
my_signal.disconnect(handler1)#内置信号类型详解
#模型信号
# Django模型相关的内置信号
from django.db.models.signals import pre_init, post_init, pre_save, post_save, pre_delete, post_delete, m2m_changed
from django.dispatch import receiver
# 1. 初始化信号
@receiver(pre_init)
def handle_pre_init(sender, **kwargs):
"""模型实例初始化前"""
print(f"即将初始化 {sender.__name__}")
@receiver(post_init)
def handle_post_init(sender, instance, **kwargs):
"""模型实例初始化后"""
print(f"已初始化 {sender.__name__} 实例")
# 2. 保存信号
@receiver(pre_save)
def handle_pre_save(sender, instance, **kwargs):
"""模型保存前"""
print(f"即将保存 {sender.__name__} 实例")
# 可以在这里修改实例属性
if hasattr(instance, 'updated_at'):
from django.utils import timezone
instance.updated_at = timezone.now()
@receiver(post_save)
def handle_post_save(sender, instance, created, **kwargs):
"""模型保存后"""
if created:
print(f"已创建新的 {sender.__name__} 实例")
# 可以触发其他操作,如发送通知
else:
print(f"已更新 {sender.__name__} 实例")
# 3. 删除信号
@receiver(pre_delete)
def handle_pre_delete(sender, instance, **kwargs):
"""模型删除前"""
print(f"即将删除 {sender.__name__} 实例")
# 可以执行清理操作
@receiver(post_delete)
def handle_post_delete(sender, instance, **kwargs):
"""模型删除后"""
print(f"已删除 {sender.__name__} 实例")
# 可以执行后续清理操作
# 4. 多对多关系信号
@receiver(m2m_changed)
def handle_m2m_changed(sender, instance, action, pk_set, **kwargs):
"""多对多关系变更时"""
print(f"M2M关系变更: {action}")
if action == "pre_add":
print(f"即将添加关系到 {instance}")
elif action == "post_add":
print(f"已添加关系到 {instance}")
elif action == "pre_remove":
print(f"即将从 {instance} 移除关系")
elif action == "post_remove":
print(f"已从 {instance} 移除关系")#请求信号
# Django请求相关的内置信号
from django.core.signals import request_started, request_finished, got_request_exception
from django.dispatch import receiver
import time
@receiver(request_started)
def handle_request_started(sender, **kwargs):
"""请求开始时"""
print(f"请求开始: {sender}")
@receiver(request_finished)
def handle_request_finished(sender, **kwargs):
"""请求结束时"""
print(f"请求结束: {sender}")
@receiver(got_request_exception)
def handle_exception(sender, request, **kwargs):
"""请求异常时"""
print(f"请求异常: {request.path}")
# 可以记录异常日志
import logging
logger = logging.getLogger(__name__)
logger.error(f"Request error on {request.path}: {kwargs.get('exception')}")#应用信号
# Django应用相关的内置信号
from django.db.models.signals import pre_migrate, post_migrate
from django.dispatch import receiver
@receiver(pre_migrate)
def handle_pre_migrate(app_config, verbosity, interactive, using, plan, apps, **kwargs):
"""迁移开始前"""
print(f"即将开始迁移应用: {app_config.name}")
@receiver(post_migrate)
def handle_post_migrate(app_config, verbosity, interactive, using, plan, apps, **kwargs):
"""迁移完成后"""
print(f"应用迁移完成: {app_config.name}")
# 可以在这里创建初始数据
if app_config.name == 'myapp':
# 创建初始用户或配置
from django.contrib.auth.models import User
if not User.objects.filter(username='admin').exists():
User.objects.create_superuser('admin', 'admin@example.com', 'adminpass')#自定义信号开发
#基础自定义信号
# 创建自定义信号
from django.dispatch import Signal, receiver
from django.db import models
# 定义自定义信号
user_registered = Signal() # 用户注册信号
order_completed = Signal() # 订单完成信号
file_uploaded = Signal() # 文件上传信号
# 用户注册信号的使用示例
def register_user(username, email, password):
"""用户注册函数"""
from django.contrib.auth.models import User
# 创建用户
user = User.objects.create_user(
username=username,
email=email,
password=password
)
# 发送用户注册信号
user_registered.send(
sender=User,
user=user,
username=username,
email=email
)
return user
# 订单完成信号的使用示例
def complete_order(order_id):
"""完成订单函数"""
from myapp.models import Order
# 更新订单状态
order = Order.objects.get(id=order_id)
order.status = 'completed'
order.save()
# 发送订单完成信号
order_completed.send(
sender=Order,
order=order,
order_id=order_id
)
# 信号接收器示例
@receiver(user_registered)
def send_welcome_email(sender, user, **kwargs):
"""发送欢迎邮件"""
print(f"发送欢迎邮件给 {user.email}")
@receiver(user_registered)
def create_user_profile(sender, user, **kwargs):
"""创建用户档案"""
from myapp.models import UserProfile
UserProfile.objects.create(user=user)
@receiver(order_completed)
def send_order_confirmation(sender, order, **kwargs):
"""发送订单确认"""
print(f"发送订单确认给 {order.user.email}")
@receiver(order_completed)
def update_inventory(sender, order, **kwargs):
"""更新库存"""
for item in order.items.all():
item.product.stock -= item.quantity
item.product.save()#高级自定义信号
# 高级自定义信号实现
from django.dispatch import Signal
from django.core.files.storage import default_storage
import json
# 带参数验证的自定义信号
class ValidatedSignal(Signal):
"""带参数验证的信号基类"""
def __init__(self, arg_names=None):
super().__init__()
self.arg_names = arg_names or []
def send(self, sender, **kwargs):
"""发送信号前验证参数"""
for arg_name in self.arg_names:
if arg_name not in kwargs:
raise ValueError(f"Missing required argument: {arg_name}")
return super().send(sender, **kwargs)
# 定义带验证的信号
validated_user_action = ValidatedSignal(['user_id', 'action_type', 'timestamp'])
# 使用上下文管理器的信号
from contextlib import contextmanager
@contextmanager
def signal_context(signal, sender, **kwargs):
"""信号上下文管理器"""
print(f"开始发送信号: {signal}")
try:
result = signal.send(sender, **kwargs)
print(f"信号发送完成")
yield result
except Exception as e:
print(f"信号发送异常: {e}")
raise
# 信号链式处理
class SignalChain:
"""信号链式处理器"""
def __init__(self):
self.handlers = []
def add_handler(self, func):
"""添加处理器"""
self.handlers.append(func)
return self
def process(self, sender, **kwargs):
"""处理信号链"""
for handler in self.handlers:
result = handler(sender, **kwargs)
if result is False: # 如果返回False,停止链式处理
break
return True
# 信号批处理器
class BatchSignalProcessor:
"""批量信号处理器"""
def __init__(self, signal, batch_size=10):
self.signal = signal
self.batch_size = batch_size
self.queue = []
def add_to_queue(self, sender, **kwargs):
"""添加到队列"""
self.queue.append((sender, kwargs))
if len(self.queue) >= self.batch_size:
self.process_batch()
def process_batch(self):
"""处理批次"""
for sender, kwargs in self.queue:
self.signal.send(sender, **kwargs)
self.queue = []
# 使用示例
batch_processor = BatchSignalProcessor(user_registered, batch_size=5)
def register_multiple_users(users_data):
"""批量注册用户"""
for user_data in users_data:
# 注册用户并添加到批处理器
user = register_user(**user_data)
batch_processor.add_to_queue(User, user=user, **user_data)#信号装饰器
# 信号相关装饰器
from functools import wraps
import time
def signal_safe(func):
"""信号安全装饰器 - 捕获异常但不影响信号流程"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
# 记录错误但不传播
import logging
logger = logging.getLogger(__name__)
logger.error(f"Signal handler error in {func.__name__}: {e}")
return None
return wrapper
def signal_async(func):
"""异步信号处理器装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
# 在单独的线程中执行信号处理
from threading import Thread
thread = Thread(target=func, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
return thread
return wrapper
def signal_rate_limit(limit_per_second=1):
"""信号频率限制装饰器"""
def decorator(func):
last_call = {}
@wraps(func)
def wrapper(sender, **kwargs):
import time
current_time = time.time()
# 按发送者限制频率
sender_key = str(sender)
if sender_key in last_call:
time_since_last = current_time - last_call[sender_key]
if time_since_last < (1.0 / limit_per_second):
return # 跳过执行
last_call[sender_key] = current_time
return func(sender, **kwargs)
return wrapper
return decorator
# 使用装饰器的信号处理器
@signal_safe
@receiver(user_registered)
def safe_welcome_handler(sender, user, **kwargs):
"""安全的欢迎处理器"""
# 可能出错的代码,但不会影响其他信号处理器
send_email(user.email, "Welcome!", "Welcome to our site!")
@signal_rate_limit(limit_per_second=0.1) # 每10秒最多1次
@receiver(post_save, sender='myapp.Article')
def rate_limited_index_handler(sender, instance, created, **kwargs):
"""频率限制的索引处理器"""
if created:
# 更新搜索引擎索引
update_search_index(instance)#信号接收器注册方式
#装饰器方式
# 使用@receiver装饰器
from django.dispatch import receiver
from django.db.models.signals import post_save
from myapp.models import MyModel
@receiver(post_save, sender=MyModel)
def my_handler(sender, instance, created, **kwargs):
"""使用装饰器注册的信号处理器"""
if created:
print(f"新创建的{sender.__name__}: {instance}")
else:
print(f"更新的{sender.__name__}: {instance}")
# 多个信号注册到同一处理器
@receiver([post_save, pre_save], sender=MyModel)
def multi_signal_handler(sender, **kwargs):
"""处理多个信号的处理器"""
print(f"收到信号: {kwargs.get('signal')}")
# 不指定发送者的通配处理器
@receiver(post_save)
def generic_save_handler(sender, instance, **kwargs):
"""通用保存处理器"""
print(f"模型 {sender.__name__} 已保存")#直接连接方式
# 直接连接信号
from django.db.models.signals import post_save, pre_save
from django.dispatch import receiver
def my_handler(sender, instance, **kwargs):
"""信号处理器函数"""
print(f"处理 {sender.__name__} 信号")
# 连接到特定发送者
post_save.connect(my_handler, sender='myapp.MyModel')
# 连接到任意发送者
post_save.connect(my_handler)
# 使用dispatch_uid避免重复连接
post_save.connect(my_handler, sender='myapp.MyModel', dispatch_uid='my_unique_handler')
# 断开连接
post_save.disconnect(my_handler, sender='myapp.MyModel')#应用配置中注册
# apps.py - 在应用配置中注册信号
from django.apps import AppConfig
class MyAppConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'myapp'
def ready(self):
"""应用准备就绪时调用"""
import sys
# 避免在管理命令中加载信号
if 'makemigrations' in sys.argv or 'migrate' in sys.argv:
return
# 导入信号处理器模块
from . import signals # 通常在signals.py中定义所有信号处理器
# 或者直接在这里注册
from django.db.models.signals import post_save
from .models import MyModel
def my_handler(sender, instance, **kwargs):
print(f"在AppConfig中注册的处理器: {instance}")
post_save.connect(my_handler, sender=MyModel)#模块化信号注册
# signals.py - 信号处理器模块
from django.dispatch import receiver
from django.db.models.signals import post_save, pre_save, pre_delete, post_delete
from django.contrib.auth.models import User
from .models import UserProfile, Article
# 用户相关信号
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
"""创建用户档案"""
if created:
UserProfile.objects.create(user=instance)
@receiver(post_save, sender=User)
def save_user_profile(sender, instance, **kwargs):
"""保存用户档案"""
if hasattr(instance, 'userprofile'):
instance.userprofile.save()
# 文章相关信号
@receiver(pre_save, sender=Article)
def article_pre_save(sender, instance, **kwargs):
"""文章保存前处理"""
# 可以在这里处理文章内容
if instance.content:
instance.word_count = len(instance.content.split())
@receiver(post_save, sender=Article)
def article_post_save(sender, instance, created, **kwargs):
"""文章保存后处理"""
if created:
# 新文章创建后可以执行的操作
from .tasks import index_article_task
index_article_task.delay(instance.id)
else:
# 更新文章后可以执行的操作
pass
# 删除文章时的处理
@receiver(pre_delete, sender=Article)
def article_pre_delete(sender, instance, **kwargs):
"""文章删除前处理"""
# 可以执行清理操作
print(f"即将删除文章: {instance.title}")
@receiver(post_delete, sender=Article)
def article_post_delete(sender, instance, **kwargs):
"""文章删除后处理"""
# 可以执行后续操作
print(f"已删除文章: {instance.title}")#信号应用场景
#数据同步场景
# 数据同步信号处理器
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from .models import Product, ProductSyncLog
@receiver(post_save, sender=Product)
def sync_product_to_external_system(sender, instance, created, **kwargs):
"""同步产品到外部系统"""
try:
# 调用外部API同步数据
external_api.update_product(
product_id=instance.id,
name=instance.name,
price=instance.price,
description=instance.description
)
# 记录同步日志
ProductSyncLog.objects.create(
product=instance,
action='created' if created else 'updated',
status='success'
)
except Exception as e:
# 记录失败日志
ProductSyncLog.objects.create(
product=instance,
action='created' if created else 'updated',
status='failed',
error_message=str(e)
)
@receiver(post_delete, sender=Product)
def delete_product_from_external_system(sender, instance, **kwargs):
"""从外部系统删除产品"""
try:
external_api.delete_product(instance.id)
ProductSyncLog.objects.create(
product=instance,
action='deleted',
status='success'
)
except Exception as e:
ProductSyncLog.objects.create(
product=instance,
action='deleted',
status='failed',
error_message=str(e)
)#缓存管理场景
# 缓存管理信号处理器
from django.core.cache import cache
from django.db.models.signals import post_save, post_delete, m2m_changed
from django.dispatch import receiver
from .models import Category, Article
@receiver(post_save, sender=Category)
def invalidate_category_cache(sender, instance, **kwargs):
"""类别保存时清除相关缓存"""
# 清除类别列表缓存
cache.delete('category_list')
cache.delete(f'category_{instance.id}')
# 清除相关文章列表缓存
for article in instance.article_set.all():
cache.delete(f'article_{article.id}')
@receiver(post_save, sender=Article)
def invalidate_article_cache(sender, instance, **kwargs):
"""文章保存时清除缓存"""
# 清除文章详情缓存
cache.delete(f'article_{instance.id}')
# 清除文章列表缓存
cache.delete('article_list')
cache.delete(f'article_list_category_{instance.category.id if instance.category else 0}')
# 清除主页缓存
cache.delete('home_page')
@receiver(m2m_changed, sender=Article.tags.through)
def invalidate_tag_cache(sender, instance, action, **kwargs):
"""标签关系变更时清除缓存"""
if action in ['post_add', 'post_remove', 'post_clear']:
cache.delete(f'article_{instance.id}')
cache.delete('tag_cloud')#审计日志场景
# 审计日志信号处理器
from django.db.models.signals import pre_save, post_save, pre_delete, post_delete
from django.dispatch import receiver
from django.contrib.auth.models import User
from .models import AuditLog, Product
def get_current_user():
"""获取当前用户(需要使用线程局部存储或其他方式)"""
# 这里简化处理,实际应用中需要从请求上下文中获取
import threading
local = threading.local()
return getattr(local, 'current_user', None)
@receiver(pre_save, sender=Product)
def log_product_pre_save(sender, instance, **kwargs):
"""产品保存前记录"""
if instance.pk: # 更新操作
old_instance = Product.objects.get(pk=instance.pk)
changes = {}
for field in instance._meta.fields:
field_name = field.name
old_value = getattr(old_instance, field_name)
new_value = getattr(instance, field_name)
if old_value != new_value:
changes[field_name] = {
'old': str(old_value),
'new': str(new_value)
}
if changes:
AuditLog.objects.create(
user=get_current_user(),
action='UPDATE',
model_name='Product',
object_id=instance.pk,
changes=changes
)
@receiver(post_save, sender=Product)
def log_product_post_save(sender, instance, created, **kwargs):
"""产品保存后记录"""
if created:
AuditLog.objects.create(
user=get_current_user(),
action='CREATE',
model_name='Product',
object_id=instance.pk,
changes={'name': instance.name, 'price': str(instance.price)}
)
@receiver(post_delete, sender=Product)
def log_product_post_delete(sender, instance, **kwargs):
"""产品删除后记录"""
AuditLog.objects.create(
user=get_current_user(),
action='DELETE',
model_name='Product',
object_id=instance.pk,
changes={'name': instance.name}
)#搜索索引场景
# 搜索索引信号处理器
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from .models import Article
@receiver(post_save, sender=Article)
def update_search_index_on_save(sender, instance, **kwargs):
"""保存时更新搜索索引"""
from .search import update_article_index
update_article_index(instance)
@receiver(post_delete, sender=Article)
def remove_from_search_index_on_delete(sender, instance, **kwargs):
"""删除时从搜索索引移除"""
from .search import remove_article_from_index
remove_article_from_index(instance)
# 异步处理版本
from celery import shared_task
@shared_task
def update_search_index_async(article_id):
"""异步更新搜索索引"""
try:
from .models import Article
article = Article.objects.get(id=article_id)
from .search import update_article_index
update_article_index(article)
except Article.DoesNotExist:
pass # 文章可能已被删除
@receiver(post_save, sender=Article)
def trigger_async_search_index_update(sender, instance, **kwargs):
"""触发异步搜索索引更新"""
update_search_index_async.delay(instance.id)#信号最佳实践
#性能优化
# 信号性能优化最佳实践
import logging
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.core.signals import request_finished
from django.db import transaction
logger = logging.getLogger(__name__)
class SignalOptimization:
"""信号优化工具类"""
@staticmethod
def debounce_signal(delay=1.0):
"""信号防抖装饰器"""
def decorator(func):
import threading
timer = {}
def debounced_func(sender, **kwargs):
# 清除之前的定时器
key = f"{func.__name__}_{sender}"
if key in timer:
timer[key].cancel()
# 设置新的定时器
def call_func():
func(sender, **kwargs)
timer[key] = threading.Timer(delay, call_func)
timer[key].start()
return debounced_func
return decorator
@staticmethod
def batch_signal_processor(batch_size=10, delay=0.5):
"""批量信号处理器"""
def decorator(func):
queue = []
timer = None
def process_batch():
nonlocal queue
if queue:
batch_data = queue[:]
queue = []
func(batch_data)
def delayed_processor(sender, **kwargs):
nonlocal timer
queue.append((sender, kwargs))
if len(queue) >= batch_size:
if timer:
timer.cancel()
process_batch()
else:
if timer:
timer.cancel()
timer = threading.Timer(delay, process_batch)
timer.start()
return delayed_processor
return decorator
# 使用优化的信号处理器
@receiver(post_save, sender='myapp.Product')
@SignalOptimization.debounce_signal(delay=2.0)
def optimized_product_handler(sender, instance, **kwargs):
"""防抖优化的产品处理器"""
# 只在2秒内没有新信号时才执行
logger.info(f"处理产品更新: {instance.name}")
# 避免在事务中执行耗时信号
@receiver(post_save, sender='myapp.Order')
def order_post_save_handler(sender, instance, created, **kwargs):
"""订单保存处理器"""
if created:
# 在事务完成后执行耗时操作
def post_transaction_func():
# 发送邮件、更新库存等耗时操作
from .tasks import process_new_order_task
process_new_order_task.delay(instance.id)
# 使用Django的on_commit回调
transaction.on_commit(post_transaction_func)#错误处理
# 信号错误处理最佳实践
import logging
from django.dispatch import receiver
from django.db.models.signals import post_save
from .models import MyModel
logger = logging.getLogger(__name__)
def safe_signal_handler(func):
"""安全信号处理器装饰器"""
def wrapper(sender, **kwargs):
try:
return func(sender, **kwargs)
except Exception as e:
logger.error(
f"Signal handler '{func.__name__}' failed: {str(e)}",
exc_info=True,
extra={
'sender': sender,
'signal_kwargs': kwargs
}
)
# 根据需要决定是否重新抛出异常
# raise # 取消注释以重新抛出异常
return wrapper
@receiver(post_save, sender=MyModel)
@safe_signal_handler
def robust_handler(sender, instance, **kwargs):
"""健壮的信号处理器"""
# 可能出错的业务逻辑
risky_operation(instance)
def conditional_signal_handler(condition_func):
"""条件信号处理器装饰器"""
def decorator(func):
def wrapper(sender, **kwargs):
if condition_func(sender, **kwargs):
return func(sender, **kwargs)
return wrapper
return decorator
@receiver(post_save, sender=MyModel)
@conditional_signal_handler(lambda sender, **kwargs: kwargs.get('created', False))
def only_for_creates(sender, instance, **kwargs):
"""只在创建时执行的处理器"""
print(f"只处理创建操作: {instance}")
# 全局信号错误处理器
class GlobalSignalErrorHandler:
"""全局信号错误处理器"""
def __init__(self):
self.original_send = None
def patch_signal_send(self):
"""补丁信号发送方法"""
from django.dispatch import Signal
original_send = Signal.send
def patched_send(self, sender, **named):
results = []
for receiver, weakref, dispatch_uid, _ in self.receivers:
try:
result = receiver(signal=self, sender=sender, **named)
results.append((receiver, result))
except Exception as e:
logger.error(
f"Signal receiver failed: {receiver}",
exc_info=True
)
results.append((receiver, None))
return results
Signal.send = patched_send
self.original_send = original_send
# 初始化全局错误处理器
global_error_handler = GlobalSignalErrorHandler()
global_error_handler.patch_signal_send()#测试信号
# 信号测试最佳实践
from django.test import TestCase, override_settings
from django.db.models.signals import post_save
from unittest.mock import patch, MagicMock
from .models import MyModel
class SignalTestCase(TestCase):
"""信号测试用例"""
def setUp(self):
"""测试设置"""
# 断开所有信号连接以避免副作用
self.receivers_backup = post_save.receivers
post_save.receivers = []
def tearDown(self):
"""测试清理"""
# 恢复信号连接
post_save.receivers = self.receivers_backup
@patch('myapp.signals.my_signal_handler')
def test_signal_fires_on_save(self, mock_handler):
"""测试保存时信号触发"""
# 创建模型实例
instance = MyModel.objects.create(name="test")
# 验证信号处理器被调用
mock_handler.assert_called_once()
def test_signal_isolation(self):
"""测试信号隔离"""
# 使用disconnect和connect来测试特定信号
from .signals import my_handler
# 临时断开信号
post_save.disconnect(my_handler, sender=MyModel)
# 执行操作
instance = MyModel.objects.create(name="test")
# 验证信号未被触发(通过其他方式验证效果)
# 重新连接信号
post_save.connect(my_handler, sender=MyModel)
# 使用django-discover-runner的信号测试
class SignalTestUtils:
"""信号测试工具"""
@staticmethod
def capture_signals(signal):
"""捕获信号发送"""
captured = []
def capture_receiver(sender, **kwargs):
captured.append((sender, kwargs))
signal.connect(capture_receiver)
return captured, capture_receiver
@staticmethod
def disable_signals(signals_list):
"""禁用指定信号"""
original_receivers = {}
for signal in signals_list:
original_receivers[signal] = signal.receivers
signal.receivers = []
def restore():
for signal, receivers in original_receivers.items():
signal.receivers = receivers
return restore
# 使用示例
def test_with_disabled_signals():
"""使用禁用信号的测试"""
from django.db.models.signals import post_save, pre_save
restore_func = SignalTestUtils.disable_signals([post_save, pre_save])
try:
# 执行不希望触发信号的操作
instance = MyModel.objects.create(name="test without signals")
finally:
restore_func() # 恢复信号#常见问题与解决方案
#问题1:信号不触发
症状:信号处理器没有被调用
解决方案:
# 1. 检查信号是否正确导入
# 确保在应用启动时导入了信号模块
# apps.py
class MyAppConfig(AppConfig):
name = 'myapp'
def ready(self):
import myapp.signals # 确保导入信号模块
# 2. 检查信号注册位置
# signals.py应该在应用根目录
# 确保使用了@receiver装饰器或手动连接
# 3. 检查发送者是否匹配
@receiver(post_save, sender=MyModel) # 确保发送者类型匹配
def my_handler(sender, instance, **kwargs):
print("信号被触发")
# 4. 验证信号触发条件
# 确保实际执行了会触发信号的操作
instance = MyModel()
instance.save() # 这会触发post_save#问题2:信号循环调用
症状:信号处理器中又触发了相同信号,造成无限循环
解决方案:
# 1. 使用标志防止循环
_processing = set()
@receiver(post_save, sender=MyModel)
def prevent_loop_handler(sender, instance, **kwargs):
"""防止循环调用"""
key = f"{sender.__name__}_{instance.pk}"
if key in _processing:
return # 避免循环
try:
_processing.add(key)
# 执行业务逻辑
# 可能会触发其他保存操作
finally:
_processing.discard(key)
# 2. 使用getattr检查避免循环
@receiver(post_save, sender=MyModel)
def avoid_recursion_handler(sender, instance, **kwargs):
"""避免递归调用"""
if getattr(instance, '_in_signal', False):
return
instance._in_signal = True
try:
# 执行业务逻辑
# 修改其他模型
pass
finally:
instance._in_signal = False
# 3. 使用update_fields避免不必要的信号
def update_without_signals(queryset, **updates):
"""更新时不触发信号"""
# 使用QuerySet.update()而不是实例.save()
queryset.update(**updates)#问题3:信号性能问题
症状:大量数据操作时应用变慢
解决方案:
# 1. 批量操作时临时断开信号
from django.db.models.signals import post_save
from contextlib import contextmanager
@contextmanager
def disconnected_signal(signal, receiver, sender):
"""临时断开信号的上下文管理器"""
signal.disconnect(receiver, sender=sender)
try:
yield
finally:
signal.connect(receiver, sender=sender)
# 使用示例
with disconnected_signal(post_save, my_handler, MyModel):
# 批量操作
for i in range(1000):
MyModel.objects.create(name=f"item_{i}")
# 2. 使用bulk_create避免逐个触发信号
MyModel.objects.bulk_create([
MyModel(name=f"item_{i}") for i in range(1000)
])
# 3. 信号处理器中使用延迟执行
from celery import shared_task
@shared_task
def deferred_signal_work(instance_data):
"""延迟执行的信号工作"""
# 执行原本在信号中做的工作
pass
@receiver(post_save, sender=MyModel)
def defer_heavy_work(sender, instance, **kwargs):
"""延迟重型工作"""
# 立即返回,将重型工作放到后台
deferred_signal_work.delay({
'id': instance.id,
'name': instance.name
})#问题4:信号顺序问题
症状:信号处理器执行顺序不符合预期
解决方案:
# 1. 显式控制执行顺序
def ordered_signal_handlers():
"""有序信号处理器"""
handlers = [
first_handler,
second_handler,
third_handler,
]
for handler in handlers:
post_save.connect(handler, sender=MyModel, weak=False)
# 2. 使用dispatch_uid确保唯一性
@receiver(post_save, sender=MyModel, dispatch_uid="unique_first_handler")
def first_handler(sender, instance, **kwargs):
print("第一个处理器")
@receiver(post_save, sender=MyModel, dispatch_uid="unique_second_handler")
def second_handler(sender, instance, **kwargs):
print("第二个处理器")
# 3. 信号处理器间的协调
_processed_items = set()
@receiver(post_save, sender=MyModel)
def coordinated_handler(sender, instance, **kwargs):
"""协调的信号处理器"""
item_key = f"{instance.__class__.__name__}_{instance.pk}"
if item_key in _processed_items:
return # 已经处理过
# 执行处理逻辑
process_item(instance)
# 标记为已处理
_processed_items.add(item_key)
# 在请求结束时清理
from django.core.signals import request_finished
def cleanup(sender, **kwargs):
_processed_items.discard(item_key)
request_finished.connect(cleanup, weak=False)#本章小结
在本章中,我们深入学习了Django信号系统:
- 信号系统概念:理解了信号作为事件通知框架的作用
- 信号架构:掌握了信号系统的组件和连接机制
- 内置信号:学习了Django提供的各种内置信号类型
- 自定义信号:掌握了创建和使用自定义信号的方法
- 注册方式:了解了多种信号接收器注册方式
- 应用场景:探讨了数据同步、缓存管理、审计日志等应用
- 最佳实践:学习了性能优化和错误处理的最佳实践
#核心要点回顾
"""
本章核心要点:
1. 信号是Django的事件通知系统,实现了观察者模式
2. 内置信号包括模型信号、请求信号、应用信号等
3. 信号可以同步或异步处理,但默认是同步的
4. 信号处理器应该快速执行,避免阻塞主线程
5. 注意信号可能导致的循环调用问题
6. 大量数据操作时考虑临时断开信号
7. 信号是解耦系统组件的有效方式
"""#下一步学习
现在您已经掌握了Django信号系统,可以继续学习:
💡 核心要点:信号系统是Django中实现松耦合架构的重要工具,正确使用信号可以让您的应用更加模块化和可维护,但要注意性能影响和潜在的循环调用问题。
#SEO优化策略
- 关键词布局: 在标题、内容中合理布局"Django信号", "事件驱动", "信号系统", "Django异步", "Django事件"等关键词
- 内容结构: 使用清晰的标题层级(H1-H3),便于搜索引擎理解内容结构
- 内部链接: 建立与其他相关教程的内部链接,提升页面权重
- 元数据优化: 在页面头部包含描述性的标题、描述和标签
🔗 相关教程推荐
- 模型设计与ORM操作 - 模型基础
- 缓存策略 - 性能优化
- 异步任务处理 - 异步处理
🏷️ 标签云: Django信号 事件驱动 信号系统 Django异步 观察者模式 解耦 性能优化 审计日志 数据同步

