Django微服务架构 - 从单体到分布式系统的演进

📂 所属阶段:第三部分 — 高级主题
🎯 难度等级:专家级
⏰ 预计学习时间:8-10小时
🎒 前置知识:部署最佳实践, 性能优化

目录

微服务架构概述

微服务定义与特点

"""
微服务架构是一种软件架构风格,它将单一应用程序开发为一组小型服务,
每个服务运行在自己的进程中,并使用轻量级机制(通常是HTTP资源API)进行通信。

核心特点:
1. 服务组件化 (Componentization via Services)
   - 每个服务都是独立的组件
   - 可以独立开发、部署和扩展
   - 降低系统耦合度

2. 按业务组织 (Organized around Business Capabilities)
   - 服务按业务领域划分
   - 每个服务专注特定业务功能
   - 团队按服务组织

3. 产品思维 (Products not Projects)
   - 服务被视为产品而非项目
   - 团队负责服务的整个生命周期
   - 持续交付和改进

4. 智能端点哑管道 (Smart Endpoints and Dumb Pipes)
   - 服务内部逻辑复杂
   - 通信机制简单(HTTP/REST)
   - 避免复杂的中间件

5. 去中心化治理 (Decentralized Governance)
   - 各团队选择合适技术栈
   - 服务可以使用不同语言
   - 独立的开发节奏

6. 去中心化数据管理 (Decentralized Data Management)
   - 每个服务管理自己的数据
   - 避免共享数据库
   - 数据一致性通过业务逻辑保证

7. 基础设施自动化 (Infrastructure Automation)
   - 容器化部署
   - 自动化测试
   - CI/CD流水线

8. 容错设计 (Design for Failure)
   - 服务可能随时失败
   - 实现熔断、重试机制
   - 提高系统韧性

9. 演进式设计 (Evolutionary Design)
   - 架构可以演进
   - 服务可以重新划分
   - 适应业务变化
"""

微服务优势与挑战

"""
微服务优势:
1. 技术多样性 (Technology Diversity)
   - 不同服务可使用不同技术
   - 选择最适合的技术栈
   - 降低技术锁定风险

2. 弹性 (Resilience)
   - 单个服务故障不影响整体
   - 故障隔离
   - 提高系统可用性

3. 扩展性 (Scaling)
   - 按需扩展特定服务
   - 资源利用更高效
   - 成本优化

4. 部署独立性 (Independent Deployment)
   - 服务独立部署
   - 减少发布冲突
   - 快速迭代

5. 组织对齐 (Organizational Alignment)
   - 团队与服务对应
   - 责任明确
   - 提高开发效率

微服务挑战:
1. 分布式系统复杂性 (Distributed System Complexity)
   - 网络延迟和故障
   - 数据一致性难题
   - 调试困难

2. 数据一致性 (Data Consistency)
   - 跨服务事务管理
   - 最终一致性
   - CAP定理约束

3. 测试复杂性 (Testing Complexity)
   - 集成测试困难
   - 端到端测试复杂
   - 环境管理

4. 运维复杂性 (Operational Complexity)
   - 服务监控
   - 日志聚合
   - 部署管理

5. 网络通信开销 (Network Communication Overhead)
   - 网络延迟
   - 序列化开销
   - 带宽消耗
"""

Django微服务适用场景

"""
Django微服务适用场景:

1. 大型复杂应用 (Large Complex Applications)
   - 单体应用难以维护
   - 功能模块众多
   - 团队规模较大

2. 高并发需求 (High Concurrency Requirements)
   - 需要独立扩展
   - 流量波动大
   - 资源隔离需求

3. 多团队协作 (Multi-team Collaboration)
   - 团队自治需求
   - 不同技术栈
   - 独立发布周期

4. 业务快速发展 (Rapid Business Growth)
   - 需求变化频繁
   - 快速原型验证
   - 功能迭代加速

5. 技术债务重构 (Technical Debt Refactoring)
   - 单体重构需求
   - 渐进式迁移
   - 风险控制
"""

服务拆分策略

领域驱动设计 (DDD)

"""
领域驱动设计是微服务拆分的重要理论基础:

1. 限界上下文 (Bounded Context)
   - 定义服务边界
   - 明确职责范围
   - 避免上下文混淆

2. 聚合根 (Aggregate Root)
   - 业务对象的核心
   - 一致性边界
   - 事务边界

3. 领域事件 (Domain Events)
   - 服务间通信
   - 状态同步
   - 最终一致性

服务拆分原则:
1. 单一职责原则 (Single Responsibility Principle)
   - 每个服务只负责一个业务领域
   - 职责明确
   - 便于维护

2. 高内聚低耦合 (High Cohesion, Low Coupling)
   - 服务内部高度相关
   - 服务间松散耦合
   - 降低依赖复杂度

3. 数据所有权 (Data Ownership)
   - 每个服务拥有自己的数据
   - 避免数据共享
   - 确保数据一致性

4. 业务能力 (Business Capability)
   - 按业务功能划分
   - 业务逻辑内聚
   - 便于理解

5. 团队规模 (Team Size)
   - 符合康威定律
   - 小团队管理
   - 责任明确
"""

Django微服务拆分示例

# 电商平台微服务拆分示例
"""
电商平台 -> 拆分为以下微服务:

1. 用户服务 (User Service)
   - 用户注册/登录
   - 用户信息管理
   - 权限认证

2. 商品服务 (Product Service)
   - 商品信息管理
   - 商品分类
   - 库存管理

3. 订单服务 (Order Service)
   - 订单创建
   - 订单状态管理
   - 支付集成

4. 支付服务 (Payment Service)
   - 支付处理
   - 退款管理
   - 支付渠道集成

5. 通知服务 (Notification Service)
   - 邮件通知
   - 短信通知
   - 推送通知

6. 评价服务 (Review Service)
   - 商品评价
   - 评分管理
   - 评价审核

7. 物流服务 (Logistics Service)
   - 物流跟踪
   - 配送管理
   - 运费计算
"""

# 用户服务示例
# user_service/apps.py
from django.apps import AppConfig

class UserServiceConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'user_service'
    
    def ready(self):
        import user_service.signals  # 注册信号处理器

# user_service/models.py
from django.db import models
from django.contrib.auth.models import AbstractUser

class User(AbstractUser):
    phone = models.CharField(max_length=20, blank=True)
    avatar = models.URLField(blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

class UserProfile(models.Model):
    user = models.OneToOneField(User, on_delete=models.CASCADE)
    bio = models.TextField(blank=True)
    location = models.CharField(max_length=100, blank=True)
    birth_date = models.DateField(null=True, blank=True)

# user_service/views.py
from rest_framework import generics, status
from rest_framework.response import Response
from rest_framework.decorators import api_view
from .models import User, UserProfile
from .serializers import UserSerializer, UserProfileSerializer

class UserListCreateView(generics.ListCreateAPIView):
    queryset = User.objects.all()
    serializer_class = UserSerializer

class UserDetailView(generics.RetrieveUpdateDestroyAPIView):
    queryset = User.objects.all()
    serializer_class = UserSerializer

@api_view(['POST'])
def register_user(request):
    """用户注册接口"""
    serializer = UserSerializer(data=request.data)
    if serializer.is_valid():
        user = serializer.save()
        return Response(serializer.data, status=status.HTTP_201_CREATED)
    return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

# 商品服务示例
# product_service/models.py
from django.db import models

class Category(models.Model):
    name = models.CharField(max_length=100)
    description = models.TextField(blank=True)
    parent = models.ForeignKey('self', null=True, blank=True, on_delete=models.CASCADE)
    created_at = models.DateTimeField(auto_now_add=True)

class Product(models.Model):
    name = models.CharField(max_length=200)
    description = models.TextField()
    price = models.DecimalField(max_digits=10, decimal_places=2)
    category = models.ForeignKey(Category, on_delete=models.CASCADE)
    stock = models.IntegerField(default=0)
    is_active = models.BooleanField(default=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

class ProductImage(models.Model):
    product = models.ForeignKey(Product, related_name='images', on_delete=models.CASCADE)
    image_url = models.URLField()
    is_primary = models.BooleanField(default=False)

# product_service/views.py
from rest_framework import generics
from .models import Product, Category
from .serializers import ProductSerializer, CategorySerializer

class ProductListView(generics.ListCreateAPIView):
    queryset = Product.objects.filter(is_active=True)
    serializer_class = ProductSerializer

class ProductDetailView(generics.RetrieveUpdateDestroyAPIView):
    queryset = Product.objects.filter(is_active=True)
    serializer_class = ProductSerializer

class CategoryListView(generics.ListCreateAPIView):
    queryset = Category.objects.all()
    serializer_class = CategorySerializer

# 订单服务示例
# order_service/models.py
from django.db import models
from decimalfields import DecimalField

class Order(models.Model):
    STATUS_CHOICES = [
        ('pending', 'Pending'),
        ('confirmed', 'Confirmed'),
        ('paid', 'Paid'),
        ('shipped', 'Shipped'),
        ('delivered', 'Delivered'),
        ('cancelled', 'Cancelled'),
    ]
    
    user_id = models.IntegerField()  # 外部用户服务关联
    total_amount = models.DecimalField(max_digits=10, decimal_places=2)
    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
    shipping_address = models.TextField()
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

class OrderItem(models.Model):
    order = models.ForeignKey(Order, related_name='items', on_delete=models.CASCADE)
    product_id = models.IntegerField()  # 外部商品服务关联
    quantity = models.PositiveIntegerField()
    unit_price = models.DecimalField(max_digits=10, decimal_places=2)
    total_price = models.DecimalField(max_digits=10, decimal_places=2)

服务边界识别

# 服务边界识别工具类
class ServiceBoundaryIdentifier:
    """服务边界识别器"""
    
    @staticmethod
    def identify_by_business_capabilities():
        """按业务能力识别服务边界"""
        business_capabilities = {
            'authentication': ['用户认证', '权限管理', '会话管理'],
            'inventory': ['库存管理', '商品信息', '价格管理'],
            'order_processing': ['订单管理', '支付处理', '物流跟踪'],
            'customer_service': ['客户管理', '评价系统', '客服系统'],
            'marketing': ['促销活动', '优惠券', '积分系统'],
            'analytics': ['数据分析', '报表生成', '监控系统']
        }
        
        return business_capabilities
    
    @staticmethod
    def identify_by_data_boundaries():
        """按数据边界识别服务"""
        data_boundaries = {
            'user_data': ['用户信息', '权限数据', '个人设置'],
            'product_data': ['商品信息', '分类数据', '库存数据'],
            'transaction_data': ['订单数据', '支付数据', '物流数据'],
            'content_data': ['评价数据', '内容数据', '媒体文件']
        }
        
        return data_boundaries
    
    @staticmethod
    def identify_by_scalability_requirements():
        """按扩展性需求识别服务"""
        scalability_requirements = {
            'high_traffic': ['商品浏览', '搜索服务', '推荐系统'],  # 需要高并发
            'batch_processing': ['报表生成', '数据同步', '备份服务'],  # 批处理
            'real_time': ['聊天服务', '通知推送', '实时监控'],  # 实时性
            'compute_intensive': ['图像处理', '视频转码', 'AI分析']  # 计算密集
        }
        
        return scalability_requirements
    
    @staticmethod
    def identify_by_team_structure():
        """按团队结构识别服务"""
        team_structure = {
            'frontend_team': ['UI服务', 'API网关', '静态资源'],
            'backend_team': ['核心业务', '数据服务', '缓存服务'],
            'mobile_team': ['移动API', '推送服务', '设备管理'],
            'data_team': ['数据分析', '机器学习', 'BI服务']
        }
        
        return team_structure

# 服务拆分评估工具
class ServiceDecompositionEvaluator:
    """服务拆分评估器"""
    
    def __init__(self):
        self.metrics = {
            'cohesion_score': 0,  # 内聚性分数
            'coupling_score': 0,  # 耦合性分数
            'complexity_score': 0,  # 复杂性分数
            'scalability_score': 0,  # 可扩展性分数
        }
    
    def evaluate_service(self, service_name, dependencies, responsibilities):
        """评估服务质量"""
        evaluation = {
            'service_name': service_name,
            'dependencies': len(dependencies),  # 依赖数量
            'responsibilities': len(responsibilities),  # 职责数量
            'size_recommendation': self._recommend_size(len(responsibilities)),
            'refactoring_suggestions': self._suggest_refactoring(dependencies, responsibilities)
        }
        
        return evaluation
    
    def _recommend_size(self, responsibility_count):
        """推荐服务大小"""
        if responsibility_count <= 3:
            return "服务过小,考虑合并"
        elif responsibility_count <= 7:
            return "服务大小合适"
        else:
            return "服务过大,建议拆分"
    
    def _suggest_refactoring(self, dependencies, responsibilities):
        """建议重构方案"""
        suggestions = []
        
        if len(dependencies) > 5:
            suggestions.append("依赖过多,考虑减少外部依赖或引入事件驱动架构")
        
        if len(responsibilities) > 7:
            suggestions.append("职责过多,建议按业务能力拆分")
        
        if any('user' in resp and 'product' in resp for resp in responsibilities):
            suggestions.append("业务领域混合,建议按领域重新划分")
        
        return suggestions

API网关设计

API网关核心功能

"""
API网关是微服务架构中的关键组件,提供统一入口和集中管理:

1. 请求路由 (Request Routing)
   - 动态路由规则
   - 服务发现集成
   - 负载均衡

2. 协议转换 (Protocol Translation)
   - REST to gRPC
   - HTTP/1.1 to HTTP/2
   - JSON to Protocol Buffers

3. 认证授权 (Authentication & Authorization)
   - JWT验证
   - OAuth2集成
   - API密钥管理

4. 限流熔断 (Rate Limiting & Circuit Breaking)
   - 请求频率限制
   - 服务熔断机制
   - 故障恢复

5. 缓存管理 (Caching)
   - 响应缓存
   - 请求缓存
   - CDN集成

6. 监控日志 (Monitoring & Logging)
   - 请求追踪
   - 性能监控
   - 错误日志

7. 安全防护 (Security Protection)
   - DDoS防护
   - SQL注入防护
   - XSS防护
"""

Django API网关实现

# gateway/settings.py
"""
# API网关配置示例
import os
from corsheaders.defaults import default_headers

# 基础配置
DEBUG = os.environ.get('DEBUG', 'False').lower() == 'true'
SECRET_KEY = os.environ.get('SECRET_KEY', 'your-secret-key')

# 允许的主机
ALLOWED_HOSTS = ['gateway.yourdomain.com', 'localhost']

# 已安装的应用
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'rest_framework',
    'corsheaders',
    'drf_yasg',  # API文档
    'gateway_app',  # 网关应用
]

# 中间件配置
MIDDLEWARE = [
    'corsheaders.middleware.CorsMiddleware',
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
    
    # 自定义中间件
    'gateway_app.middleware.AuthenticationMiddleware',
    'gateway_app.middleware.RateLimitMiddleware',
    'gateway_app.middleware.LoggingMiddleware',
    'gateway_app.middleware.CircuitBreakerMiddleware',
]

# REST Framework配置
REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'gateway_app.authentication.JWTAuthentication',
    ],
    'DEFAULT_PERMISSION_CLASSES': [
        'rest_framework.permissions.IsAuthenticated',
    ],
    'DEFAULT_THROTTLE_CLASSES': [
        'rest_framework.throttling.AnonRateThrottle',
        'rest_framework.throttling.UserRateThrottle'
    ],
    'DEFAULT_THROTTLE_RATES': {
        'anon': '100/hour',
        'user': '1000/hour'
    }
}

# 服务注册配置
SERVICE_REGISTRY = {
    'consul': {
        'host': os.environ.get('CONSUL_HOST', 'localhost'),
        'port': int(os.environ.get('CONSUL_PORT', 8500)),
    }
}

# 缓存配置
CACHES = {
    'default': {
        'BACKEND': 'django_redis.cache.RedisCache',
        'LOCATION': os.environ.get('REDIS_URL', 'redis://localhost:6379/1'),
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
        }
    }
}

# 数据库配置
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': os.environ.get('DB_NAME', 'gateway_db'),
        'USER': os.environ.get('DB_USER', 'gateway_user'),
        'PASSWORD': os.environ.get('DB_PASSWORD', ''),
        'HOST': os.environ.get('DB_HOST', 'localhost'),
        'PORT': os.environ.get('DB_PORT', '5432'),
    }
}
"""
# gateway_app/models.py
from django.db import models
import json

class Service(models.Model):
    """服务注册模型"""
    name = models.CharField(max_length=100, unique=True)
    url = models.URLField()
    description = models.TextField(blank=True)
    is_active = models.BooleanField(default=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    
    class Meta:
        db_table = 'services'
    
    def __str__(self):
        return self.name

class ServiceInstance(models.Model):
    """服务实例模型"""
    service = models.ForeignKey(Service, on_delete=models.CASCADE, related_name='instances')
    host = models.CharField(max_length=100)
    port = models.IntegerField()
    health_status = models.CharField(max_length=20, default='unknown')
    weight = models.IntegerField(default=1)
    metadata = models.JSONField(default=dict, blank=True)
    last_heartbeat = models.DateTimeField(auto_now=True)
    is_active = models.BooleanField(default=True)
    
    class Meta:
        db_table = 'service_instances'

class APIRoute(models.Model):
    """API路由模型"""
    METHOD_CHOICES = [
        ('GET', 'GET'),
        ('POST', 'POST'),
        ('PUT', 'PUT'),
        ('DELETE', 'DELETE'),
        ('PATCH', 'PATCH'),
    ]
    
    path = models.CharField(max_length=200)
    target_service = models.ForeignKey(Service, on_delete=models.CASCADE)
    methods = models.JSONField(default=list)  # ['GET', 'POST']
    rate_limit = models.IntegerField(default=100)  # 每分钟请求数
    timeout = models.IntegerField(default=30)  # 超时时间(秒)
    retry_count = models.IntegerField(default=3)  # 重试次数
    circuit_breaker_enabled = models.BooleanField(default=True)
    is_active = models.BooleanField(default=True)
    
    class Meta:
        db_table = 'api_routes'
        unique_together = ['path', 'target_service']

class RequestLog(models.Model):
    """请求日志模型"""
    route = models.ForeignKey(APIRoute, on_delete=models.CASCADE)
    method = models.CharField(max_length=10)
    path = models.CharField(max_length=500)
    status_code = models.IntegerField()
    response_time = models.FloatField()  # 毫秒
    client_ip = models.GenericIPAddressField()
    user_agent = models.TextField(blank=True)
    request_headers = models.JSONField(default=dict)
    response_headers = models.JSONField(default=dict)
    timestamp = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        db_table = 'request_logs'
        indexes = [
            models.Index(fields=['timestamp']),
            models.Index(fields=['route', 'timestamp']),
        ]
# gateway_app/services.py
import requests
import json
import time
import logging
from typing import Dict, Any, Optional
from django.core.cache import cache
from .models import Service, ServiceInstance, APIRoute

logger = logging.getLogger(__name__)

class ServiceDiscovery:
    """服务发现服务"""
    
    def __init__(self):
        self.cache_ttl = 30  # 30秒缓存
    
    def get_service_instance(self, service_name: str) -> Optional[ServiceInstance]:
        """获取服务实例"""
        cache_key = f"service_instance_{service_name}"
        instance = cache.get(cache_key)
        
        if instance is None:
            try:
                service = Service.objects.get(name=service_name, is_active=True)
                instance = ServiceInstance.objects.filter(
                    service=service,
                    is_active=True,
                    health_status='healthy'
                ).first()
                
                if instance:
                    cache.set(cache_key, instance, self.cache_ttl)
            except Service.DoesNotExist:
                logger.error(f"Service {service_name} not found")
                return None
        
        return instance
    
    def get_all_instances(self, service_name: str) -> list:
        """获取所有服务实例"""
        instances = ServiceInstance.objects.filter(
            service__name=service_name,
            is_active=True,
            health_status='healthy'
        )
        return list(instances)
    
    def register_service(self, service_data: Dict[str, Any]) -> bool:
        """注册服务"""
        try:
            service, created = Service.objects.get_or_create(
                name=service_data['name'],
                defaults={
                    'url': service_data['url'],
                    'description': service_data.get('description', '')
                }
            )
            
            if 'instances' in service_data:
                for instance_data in service_data['instances']:
                    ServiceInstance.objects.update_or_create(
                        service=service,
                        host=instance_data['host'],
                        port=instance_data['port'],
                        defaults={
                            'weight': instance_data.get('weight', 1),
                            'metadata': instance_data.get('metadata', {}),
                            'is_active': True
                        }
                    )
            
            # 清除相关缓存
            cache.delete_pattern(f"service_instance_{service.name}*")
            
            return True
        except Exception as e:
            logger.error(f"Failed to register service: {e}")
            return False
    
    def deregister_service(self, service_name: str) -> bool:
        """注销服务"""
        try:
            service = Service.objects.get(name=service_name)
            service.is_active = False
            service.save()
            
            # 清除缓存
            cache.delete_pattern(f"service_instance_{service_name}*")
            
            return True
        except Service.DoesNotExist:
            return False

class LoadBalancer:
    """负载均衡器"""
    
    def __init__(self):
        self.algorithm = 'round_robin'
        self.current_index = {}
    
    def select_instance(self, instances: list) -> Optional[ServiceInstance]:
        """选择服务实例"""
        if not instances:
            return None
        
        active_instances = [inst for inst in instances if inst.is_active and inst.health_status == 'healthy']
        if not active_instances:
            return None
        
        if self.algorithm == 'round_robin':
            return self._round_robin_selection(active_instances)
        elif self.algorithm == 'weighted_round_robin':
            return self._weighted_round_robin_selection(active_instances)
        elif self.algorithm == 'least_connections':
            return self._least_connections_selection(active_instances)
        else:
            return active_instances[0]  # 默认轮询
    
    def _round_robin_selection(self, instances: list) -> ServiceInstance:
        """轮询选择"""
        service_name = instances[0].service.name
        current_idx = self.current_index.get(service_name, 0)
        
        selected = instances[current_idx % len(instances)]
        self.current_index[service_name] = (current_idx + 1) % len(instances)
        
        return selected
    
    def _weighted_round_robin_selection(self, instances: list) -> ServiceInstance:
        """加权轮询选择"""
        # 简化实现:按权重比例分配
        total_weight = sum(inst.weight for inst in instances)
        if total_weight == 0:
            return instances[0]
        
        # 这里可以实现更复杂的加权算法
        return self._round_robin_selection(instances)
    
    def _least_connections_selection(self, instances: list) -> ServiceInstance:
        """最少连接选择"""
        # 简化实现:返回第一个实例
        # 实际应用中需要维护连接数统计
        return instances[0]

class APIClient:
    """API客户端"""
    
    def __init__(self):
        self.session = requests.Session()
        self.timeout = 30
    
    def forward_request(self, 
                       instance: ServiceInstance, 
                       path: str, 
                       method: str, 
                       headers: dict, 
                       body: Optional[str] = None) -> tuple:
        """转发请求到后端服务"""
        url = f"http://{instance.host}:{instance.port}{path}"
        
        start_time = time.time()
        
        try:
            response = self.session.request(
                method=method.upper(),
                url=url,
                headers=headers,
                data=body,
                timeout=self.timeout
            )
            
            response_time = (time.time() - start_time) * 1000  # 转换为毫秒
            
            return {
                'status_code': response.status_code,
                'headers': dict(response.headers),
                'body': response.text,
                'response_time': response_time
            }
            
        except requests.exceptions.Timeout:
            logger.error(f"Request timeout to {url}")
            return {
                'status_code': 504,
                'headers': {},
                'body': '{"error": "Gateway Timeout"}',
                'response_time': (time.time() - start_time) * 1000
            }
        except requests.exceptions.ConnectionError:
            logger.error(f"Connection error to {url}")
            return {
                'status_code': 502,
                'headers': {},
                'body': '{"error": "Bad Gateway"}',
                'response_time': (time.time() - start_time) * 1000
            }
        except Exception as e:
            logger.error(f"Unexpected error forwarding request: {e}")
            return {
                'status_code': 500,
                'headers': {},
                'body': '{"error": "Internal Server Error"}',
                'response_time': (time.time() - start_time) * 1000
            }

class CircuitBreaker:
    """熔断器"""
    
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.state = {}  # {service_name: {'state': 'closed/open/half-open', 'failure_count': 0, 'last_failure_time': timestamp}}
    
    def call(self, service_name: str, func, *args, **kwargs):
        """执行带熔断的调用"""
        service_state = self.state.get(service_name, {
            'state': 'closed',  # closed, open, half-open
            'failure_count': 0,
            'last_failure_time': 0
        })
        
        if service_state['state'] == 'open':
            # 检查是否应该进入半开状态
            if time.time() - service_state['last_failure_time'] > self.timeout:
                service_state['state'] = 'half_open'
                self.state[service_name] = service_state
            else:
                # 熔断状态,直接返回错误
                raise Exception(f"Service {service_name} is currently unavailable")
        
        try:
            result = func(*args, **kwargs)
            
            # 成功调用,重置状态
            if service_state['state'] != 'closed':
                service_state['state'] = 'closed'
                service_state['failure_count'] = 0
                self.state[service_name] = service_state
            
            return result
            
        except Exception as e:
            # 失败调用,更新状态
            service_state['failure_count'] += 1
            service_state['last_failure_time'] = time.time()
            
            if service_state['failure_count'] >= self.failure_threshold:
                service_state['state'] = 'open'
            
            self.state[service_name] = service_state
            raise e
# gateway_app/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from django.http import JsonResponse
from .services import ServiceDiscovery, LoadBalancer, APIClient, CircuitBreaker
from .models import APIRoute, RequestLog
import json
import logging

logger = logging.getLogger(__name__)

class GatewayView(APIView):
    """API网关视图"""
    
    def __init__(self):
        super().__init__()
        self.service_discovery = ServiceDiscovery()
        self.load_balancer = LoadBalancer()
        self.api_client = APIClient()
        self.circuit_breaker = CircuitBreaker()
    
    def dispatch(self, request, *args, **kwargs):
        """请求分发"""
        path = request.path
        method = request.method
        
        # 查找匹配的路由
        try:
            route = APIRoute.objects.get(
                path=path,
                methods__contains=[method],
                is_active=True
            )
        except APIRoute.DoesNotExist:
            return JsonResponse({'error': 'Route not found'}, status=404)
        
        # 检查服务实例
        instance = self.service_discovery.get_service_instance(route.target_service.name)
        if not instance:
            return JsonResponse({'error': 'Service unavailable'}, status=503)
        
        # 构建请求头
        headers = self._build_headers(request)
        
        # 构建请求体
        body = request.body.decode('utf-8') if request.body else None
        
        # 执行请求
        try:
            response_data = self.circuit_breaker.call(
                route.target_service.name,
                self.api_client.forward_request,
                instance,
                path,
                method,
                headers,
                body
            )
            
            # 记录请求日志
            self._log_request(route, request, response_data)
            
            return JsonResponse(
                json.loads(response_data['body']) if response_data['body'] else {},
                status=response_data['status_code'],
                headers=response_data['headers']
            )
            
        except Exception as e:
            logger.error(f"Gateway error: {e}")
            return JsonResponse({'error': 'Internal Server Error'}, status=500)
    
    def _build_headers(self, request):
        """构建请求头"""
        headers = {}
        
        # 复制原始请求头
        for key, value in request.META.items():
            if key.startswith('HTTP_'):
                header_name = key[5:].replace('_', '-').title()
                headers[header_name] = value
        
        # 添加认证信息
        auth_header = request.META.get('HTTP_AUTHORIZATION')
        if auth_header:
            headers['Authorization'] = auth_header
        
        # 添加网关标识
        headers['X-Gateway-Forwarded'] = 'true'
        headers['X-Forwarded-For'] = request.META.get('REMOTE_ADDR', '')
        
        return headers
    
    def _log_request(self, route, request, response_data):
        """记录请求日志"""
        try:
            RequestLog.objects.create(
                route=route,
                method=request.method,
                path=request.path,
                status_code=response_data['status_code'],
                response_time=response_data['response_time'],
                client_ip=request.META.get('REMOTE_ADDR', ''),
                user_agent=request.META.get('HTTP_USER_AGENT', ''),
                request_headers=dict(request.headers),
                response_headers=response_data['headers']
            )
        except Exception as e:
            logger.error(f"Failed to log request: {e}")

class ServiceRegistryView(APIView):
    """服务注册视图"""
    
    def post(self, request):
        """注册服务"""
        service_data = request.data
        
        discovery = ServiceDiscovery()
        success = discovery.register_service(service_data)
        
        if success:
            return Response({'message': 'Service registered successfully'}, status=status.HTTP_201_CREATED)
        else:
            return Response({'error': 'Failed to register service'}, status=status.HTTP_400_BAD_REQUEST)
    
    def delete(self, request, service_name):
        """注销服务"""
        discovery = ServiceDiscovery()
        success = discovery.deregister_service(service_name)
        
        if success:
            return Response({'message': 'Service deregistered successfully'})
        else:
            return Response({'error': 'Service not found'}, status=status.HTTP_404_NOT_FOUND)

class HealthCheckView(APIView):
    """健康检查视图"""
    
    def get(self, request):
        """网关健康检查"""
        return Response({
            'status': 'healthy',
            'timestamp': time.time(),
            'version': '1.0.0'
        })
# gateway_app/middleware.py
from django.http import JsonResponse
import time
import logging
from django.core.cache import cache
from .models import Service, ServiceInstance

logger = logging.getLogger(__name__)

class RateLimitMiddleware:
    """限流中间件"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        client_ip = self._get_client_ip(request)
        path = request.path
        key = f"rate_limit:{client_ip}:{path}"
        
        # 获取当前计数
        current = cache.get(key, 0)
        
        # 检查是否超过限制(这里简化处理)
        if current >= 100:  # 每分钟最多100次请求
            return JsonResponse({
                'error': 'Rate limit exceeded',
                'retry_after': 60
            }, status=429)
        
        # 增加计数
        if current == 0:
            cache.set(key, 1, timeout=60)  # 60秒重置
        else:
            cache.incr(key)
        
        response = self.get_response(request)
        return response
    
    def _get_client_ip(self, request):
        """获取客户端IP"""
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            ip = x_forwarded_for.split(',')[0]
        else:
            ip = request.META.get('REMOTE_ADDR')
        return ip

class AuthenticationMiddleware:
    """认证中间件"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        # 跳过健康检查等公共接口
        if request.path in ['/health/', '/docs/', '/redoc/']:
            response = self.get_response(request)
            return response
        
        # 检查认证头
        auth_header = request.META.get('HTTP_AUTHORIZATION', '')
        if not auth_header:
            return JsonResponse({'error': 'Authentication required'}, status=401)
        
        # 验证JWT令牌(简化处理)
        if not self._validate_token(auth_header):
            return JsonResponse({'error': 'Invalid token'}, status=401)
        
        response = self.get_response(request)
        return response
    
    def _validate_token(self, token):
        """验证令牌"""
        # 这里应该实现完整的JWT验证逻辑
        # 包括签名验证、过期检查等
        return True

class LoggingMiddleware:
    """日志中间件"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        start_time = time.time()
        
        response = self.get_response(request)
        
        duration = (time.time() - start_time) * 1000
        
        logger.info(
            f"REQUEST: {request.method} {request.path} "
            f"STATUS: {response.status_code} "
            f"DURATION: {duration:.2f}ms "
            f"IP: {self._get_client_ip(request)}"
        )
        
        return response
    
    def _get_client_ip(self, request):
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            ip = x_forwarded_for.split(',')[0]
        else:
            ip = request.META.get('REMOTE_ADDR')
        return ip

class CircuitBreakerMiddleware:
    """熔断中间件"""
    
    def __init__(self, get_response):
        self.get_response = get_response
        self.breaker_states = {}  # 简化实现
    
    def __call__(self, request):
        # 这里可以实现更复杂的熔断逻辑
        # 基于服务调用成功率等指标
        response = self.get_response(request)
        return response

服务通信机制

同步通信

"""
同步通信适用于:
1. 需要即时响应的场景
2. 事务性操作
3. 请求-响应模式
4. 数据强一致性要求

常见实现:
1. REST API
2. GraphQL
3. gRPC
4. SOAP
"""

# REST API客户端示例
import requests
import json
from typing import Dict, Any, Optional

class RestAPIClient:
    """REST API客户端"""
    
    def __init__(self, base_url: str, timeout: int = 30):
        self.base_url = base_url.rstrip('/')
        self.timeout = timeout
        self.session = requests.Session()
    
    def get(self, endpoint: str, params: Optional[Dict] = None, headers: Optional[Dict] = None) -> Dict[str, Any]:
        """GET请求"""
        url = f"{self.base_url}{endpoint}"
        response = self.session.get(url, params=params, headers=headers, timeout=self.timeout)
        response.raise_for_status()
        return response.json()
    
    def post(self, endpoint: str, data: Optional[Dict] = None, headers: Optional[Dict] = None) -> Dict[str, Any]:
        """POST请求"""
        url = f"{self.base_url}{endpoint}"
        headers = headers or {}
        headers['Content-Type'] = 'application/json'
        
        response = self.session.post(url, json=data, headers=headers, timeout=self.timeout)
        response.raise_for_status()
        return response.json()
    
    def put(self, endpoint: str, data: Optional[Dict] = None, headers: Optional[Dict] = None) -> Dict[str, Any]:
        """PUT请求"""
        url = f"{self.base_url}{endpoint}"
        headers = headers or {}
        headers['Content-Type'] = 'application/json'
        
        response = self.session.put(url, json=data, headers=headers, timeout=self.timeout)
        response.raise_for_status()
        return response.json()
    
    def delete(self, endpoint: str, headers: Optional[Dict] = None) -> bool:
        """DELETE请求"""
        url = f"{self.base_url}{endpoint}"
        response = self.session.delete(url, headers=headers, timeout=self.timeout)
        return response.status_code == 204

# 使用示例
"""
user_client = RestAPIClient('http://user-service:8000/api/v1')
product_client = RestAPIClient('http://product-service:8000/api/v1')

# 获取用户信息
user_data = user_client.get(f'/users/{user_id}')

# 创建订单
order_data = {
    'user_id': user_id,
    'items': [{'product_id': 1, 'quantity': 2}],
    'shipping_address': '...'
}
order_response = user_client.post('/orders', order_data)
"""

异步通信

"""
异步通信适用于:
1. 事件驱动架构
2. 解耦服务依赖
3. 提高系统吞吐量
4. 最终一致性场景

常见实现:
1. 消息队列 (RabbitMQ, Kafka, Redis)
2. 事件总线
3. 发布-订阅模式
"""

# 消息队列服务
import json
import uuid
from datetime import datetime
from typing import Dict, Any, Callable
import redis

class MessageQueue:
    """消息队列服务"""
    
    def __init__(self, redis_url: str = 'redis://localhost:6379/0'):
        self.redis_client = redis.from_url(redis_url)
        self.publisher_channel = 'events'
    
    def publish(self, event_type: str, data: Dict[str, Any], priority: int = 1):
        """发布事件"""
        event = {
            'id': str(uuid.uuid4()),
            'type': event_type,
            'data': data,
            'timestamp': datetime.utcnow().isoformat(),
            'priority': priority
        }
        
        # 发布到频道
        self.redis_client.publish(self.publisher_channel, json.dumps(event))
        
        # 也可以存入队列
        queue_name = f"queue:{event_type}"
        self.redis_client.lpush(queue_name, json.dumps(event))
    
    def subscribe(self, event_type: str, handler: Callable[[Dict[str, Any]], None]):
        """订阅事件"""
        import threading
        
        def listener():
            pubsub = self.redis_client.pubsub()
            pubsub.subscribe(self.publisher_channel)
            
            for message in pubsub.listen():
                if message['type'] == 'message':
                    event = json.loads(message['data'])
                    if event['type'] == event_type:
                        handler(event)
        
        thread = threading.Thread(target=listener, daemon=True)
        thread.start()
    
    def consume(self, queue_name: str, handler: Callable[[Dict[str, Any]], None], batch_size: int = 10):
        """消费队列消息"""
        while True:
            # 批量获取消息
            messages = self.redis_client.lrange(queue_name, 0, batch_size - 1)
            
            for message in messages:
                event = json.loads(message)
                try:
                    handler(event)
                    # 处理成功后移除消息
                    self.redis_client.lpop(queue_name)
                except Exception as e:
                    # 处理失败,可以移到死信队列
                    print(f"Error processing message: {e}")
                    break

# 事件处理器示例
class EventHandlers:
    """事件处理器集合"""
    
    def __init__(self, message_queue: MessageQueue):
        self.message_queue = message_queue
        self._register_handlers()
    
    def _register_handlers(self):
        """注册事件处理器"""
        # 用户注册事件
        self.message_queue.subscribe('user_registered', self.handle_user_registered)
        
        # 订单创建事件
        self.message_queue.subscribe('order_created', self.handle_order_created)
        
        # 库存更新事件
        self.message_queue.subscribe('stock_updated', self.handle_stock_updated)
    
    def handle_user_registered(self, event: Dict[str, Any]):
        """处理用户注册事件"""
        user_data = event['data']
        print(f"New user registered: {user_data['username']}")
        
        # 发送欢迎邮件
        self.message_queue.publish('send_welcome_email', {
            'user_id': user_data['id'],
            'email': user_data['email']
        })
    
    def handle_order_created(self, event: Dict[str, Any]):
        """处理订单创建事件"""
        order_data = event['data']
        print(f"Order created: {order_data['id']}")
        
        # 更新库存
        for item in order_data['items']:
            self.message_queue.publish('update_stock', {
                'product_id': item['product_id'],
                'quantity': item['quantity'],
                'action': 'decrease'
            })
        
        # 发送订单确认
        self.message_queue.publish('send_order_confirmation', {
            'order_id': order_data['id'],
            'user_id': order_data['user_id']
        })
    
    def handle_stock_updated(self, event: Dict[str, Any]):
        """处理库存更新事件"""
        stock_data = event['data']
        print(f"Stock updated for product {stock_data['product_id']}")
        
        # 检查库存预警
        if stock_data['quantity'] < 10:
            self.message_queue.publish('low_stock_alert', {
                'product_id': stock_data['product_id'],
                'current_quantity': stock_data['quantity']
            })

# 使用示例
"""
mq = MessageQueue('redis://localhost:6379/0')
handlers = EventHandlers(mq)

# 发布用户注册事件
mq.publish('user_registered', {
    'id': 123,
    'username': 'john_doe',
    'email': 'john@example.com'
})
"""

gRPC通信

"""
gRPC适用于:
1. 高性能通信
2. 类型安全
3. 双向流
4. 微服务间通信

安装: pip install grpcio grpcio-tools
"""

# 示例proto文件 (user_service.proto)
"""
syntax = "proto3";

package user_service;

service UserService {
    rpc GetUser(GetUserRequest) returns (GetUserResponse);
    rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
    rpc UpdateUser(UpdateUserRequest) returns (UpdateUserResponse);
    rpc DeleteUser(DeleteUserRequest) returns (DeleteUserResponse);
}

message GetUserRequest {
    int32 user_id = 1;
}

message GetUserResponse {
    int32 id = 1;
    string username = 2;
    string email = 3;
    string phone = 4;
}

message CreateUserRequest {
    string username = 1;
    string email = 2;
    string phone = 3;
    string password = 4;
}

message CreateUserResponse {
    int32 id = 1;
    string username = 2;
    string email = 3;
}

message UpdateUserRequest {
    int32 user_id = 1;
    string username = 2;
    string email = 3;
    string phone = 4;
}

message UpdateUserResponse {
    bool success = 1;
    string message = 2;
}

message DeleteUserRequest {
    int32 user_id = 1;
}

message DeleteUserResponse {
    bool success = 1;
}
"""

# gRPC客户端
import grpc
import user_service_pb2
import user_service_pb2_grpc

class GRPCUserServiceClient:
    """gRPC用户服务客户端"""
    
    def __init__(self, server_address: str):
        self.channel = grpc.insecure_channel(server_address)
        self.stub = user_service_pb2_grpc.UserServiceStub(self.channel)
    
    def get_user(self, user_id: int) -> user_service_pb2.GetUserResponse:
        """获取用户信息"""
        request = user_service_pb2.GetUserRequest(user_id=user_id)
        return self.stub.GetUser(request)
    
    def create_user(self, username: str, email: str, phone: str, password: str) -> user_service_pb2.CreateUserResponse:
        """创建用户"""
        request = user_service_pb2.CreateUserRequest(
            username=username,
            email=email,
            phone=phone,
            password=password
        )
        return self.stub.CreateUser(request)
    
    def update_user(self, user_id: int, username: str = None, email: str = None, phone: str = None) -> user_service_pb2.UpdateUserResponse:
        """更新用户"""
        request = user_service_pb2.UpdateUserRequest(
            user_id=user_id,
            username=username or "",
            email=email or "",
            phone=phone or ""
        )
        return self.stub.UpdateUser(request)
    
    def delete_user(self, user_id: int) -> user_service_pb2.DeleteUserResponse:
        """删除用户"""
        request = user_service_pb2.DeleteUserRequest(user_id=user_id)
        return self.stub.DeleteUser(request)
    
    def close(self):
        """关闭连接"""
        self.channel.close()

# 使用示例
"""
client = GRPCUserServiceClient('localhost:50051')

try:
    # 创建用户
    response = client.create_user('john_doe', 'john@example.com', '1234567890', 'password123')
    print(f"Created user: {response.id}")
    
    # 获取用户
    user = client.get_user(response.id)
    print(f"User: {user.username}, Email: {user.email}")
finally:
    client.close()
"""

分布式数据管理

数据一致性策略

"""
分布式数据一致性挑战:
1. 网络分区容忍性
2. 数据复制延迟
3. 并发访问冲突
4. 故障恢复

一致性模型:
1. 强一致性 (Strong Consistency)
   - 线性一致性
   - 顺序一致性
   - 适合金融交易

2. 最终一致性 (Eventual Consistency)
   - 读己之所写
   - 单调读
   - 单调写
   - 适合社交网络

3. 因果一致性 (Causal Consistency)
   - 因果关系保持
   - 适合协同编辑
"""

# Saga模式实现
from typing import List, Dict, Any
from enum import Enum
import logging

logger = logging.getLogger(__name__)

class SagaStep:
    """Saga步骤"""
    
    def __init__(self, action: callable, compensate: callable, data: Dict[str, Any]):
        self.action = action
        self.compensate = compensate
        self.data = data

class SagaOrchestration:
    """Saga编排器"""
    
    def __init__(self):
        self.steps: List[SagaStep] = []
        self.completed_steps: List[int] = []
    
    def add_step(self, action: callable, compensate: callable, data: Dict[str, Any]):
        """添加步骤"""
        self.steps.append(SagaStep(action, compensate, data))
    
    def execute(self):
        """执行Saga"""
        for i, step in enumerate(self.steps):
            try:
                logger.info(f"Executing step {i}: {step.action.__name__}")
                result = step.action(**step.data)
                self.completed_steps.append(i)
                logger.info(f"Step {i} completed successfully")
            except Exception as e:
                logger.error(f"Step {i} failed: {e}")
                self.compensate(i)
                raise e
    
    def compensate(self, failed_step: int):
        """补偿执行"""
        logger.info("Starting compensation...")
        # 从后往前执行补偿
        for i in reversed(range(len(self.completed_steps))):
            step_index = self.completed_steps[i]
            if step_index < failed_step:  # 只补偿已执行的步骤
                try:
                    logger.info(f"Compensating step {step_index}")
                    step = self.steps[step_index]
                    step.compensate(**step.data)
                    logger.info(f"Step {step_index} compensated")
                except Exception as e:
                    logger.error(f"Compensation failed for step {step_index}: {e}")

# 订单创建Saga示例
class OrderSaga:
    """订单创建Saga"""
    
    def __init__(self):
        self.saga = SagaOrchestration()
    
    def create_order_saga(self, order_data: Dict[str, Any]):
        """创建订单Saga"""
        # 步骤1: 创建订单
        self.saga.add_step(
            action=self.reserve_inventory,
            compensate=self.release_inventory,
            data={'items': order_data['items']}
        )
        
        # 步骤2: 预扣库存
        self.saga.add_step(
            action=self.charge_payment,
            compensate=self.refund_payment,
            data={'amount': order_data['total_amount'], 'user_id': order_data['user_id']}
        )
        
        # 步骤3: 扣减支付
        self.saga.add_step(
            action=self.confirm_order,
            compensate=self.cancel_order,
            data={'order_id': order_data['id']}
        )
        
        # 执行Saga
        self.saga.execute()
    
    def reserve_inventory(self, items: List[Dict]):
        """预留库存"""
        # 调用库存服务预留库存
        print(f"Reserving inventory for items: {items}")
        return True
    
    def release_inventory(self, items: List[Dict]):
        """释放库存"""
        # 调用库存服务释放库存
        print(f"Releasing inventory for items: {items}")
    
    def charge_payment(self, amount: float, user_id: int):
        """扣减支付"""
        # 调用支付服务扣款
        print(f"Charging payment: {amount} for user {user_id}")
        return True
    
    def refund_payment(self, amount: float, user_id: int):
        """退款"""
        # 调用支付服务退款
        print(f"Refunding payment: {amount} for user {user_id}")
    
    def confirm_order(self, order_id: int):
        """确认订单"""
        # 更新订单状态为已确认
        print(f"Confirming order: {order_id}")
        return True
    
    def cancel_order(self, order_id: int):
        """取消订单"""
        # 更新订单状态为已取消
        print(f"Cancelling order: {order_id}")

# 使用示例
"""
order_saga = OrderSaga()
order_data = {
    'id': 123,
    'user_id': 456,
    'items': [{'product_id': 1, 'quantity': 2}],
    'total_amount': 100.00
}

try:
    order_saga.create_order_saga(order_data)
    print("Order created successfully")
except Exception as e:
    print(f"Order creation failed: {e}")
"""

分布式事务

"""
分布式事务实现方式:

1. 两阶段提交 (2PC)
   - 准备阶段
   - 提交阶段
   - 同步阻塞

2. 三阶段提交 (3PC)
   - CanCommit
   - PreCommit  
   - DoCommit
   - 减少阻塞

3. TCC模式 (Try-Confirm-Cancel)
   - Try: 尝试执行
   - Confirm: 确认执行
   - Cancel: 取消执行

4. 本地消息表
   - 本地事务保证
   - 消息表记录状态
   - 定时任务处理

5. 最大努力通知
   - 多次尝试
   - 指数退避
   - 人工干预
"""

# TCC模式实现
class TCCFramework:
    """TCC框架"""
    
    def __init__(self):
        self.transaction_manager = TransactionManager()
    
    def execute_tcc_transaction(self, business_action: callable, *args, **kwargs):
        """执行TCC事务"""
        transaction_id = self.transaction_manager.begin_transaction()
        
        try:
            # Try阶段
            confirm_data = business_action(*args, **kwargs)
            
            # Confirm阶段
            if self.transaction_manager.can_confirm(transaction_id):
                self.transaction_manager.confirm_transaction(transaction_id)
                return confirm_data
            else:
                # Cancel阶段
                self.transaction_manager.cancel_transaction(transaction_id)
                raise Exception("Transaction cannot be confirmed")
                
        except Exception as e:
            # Cancel阶段
            self.transaction_manager.cancel_transaction(transaction_id)
            raise e

class TransactionManager:
    """事务管理器"""
    
    def __init__(self):
        self.transactions = {}
    
    def begin_transaction(self) -> str:
        """开始事务"""
        import uuid
        transaction_id = str(uuid.uuid4())
        self.transactions[transaction_id] = {
            'status': 'active',
            'participants': [],
            'created_at': time.time()
        }
        return transaction_id
    
    def can_confirm(self, transaction_id: str) -> bool:
        """检查是否可以确认"""
        # 检查所有参与者是否准备好
        return True
    
    def confirm_transaction(self, transaction_id: str):
        """确认事务"""
        if transaction_id in self.transactions:
            self.transactions[transaction_id]['status'] = 'confirmed'
    
    def cancel_transaction(self, transaction_id: str):
        """取消事务"""
        if transaction_id in self.transactions:
            self.transactions[transaction_id]['status'] = 'cancelled'

# 本地消息表实现
from django.db import models, transaction

class LocalMessage(models.Model):
    """本地消息表"""
    STATUS_CHOICES = [
        ('pending', 'Pending'),
        ('sent', 'Sent'),
        ('confirmed', 'Confirmed'),
        ('failed', 'Failed'),
    ]
    
    message_id = models.CharField(max_length=100, unique=True)
    topic = models.CharField(max_length=100)
    content = models.TextField()
    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    retry_count = models.IntegerField(default=0)
    max_retries = models.IntegerField(default=3)

class MessageService:
    """消息服务"""
    
    @staticmethod
    def send_message_with_local_transaction(topic: str, content: dict, business_logic: callable):
        """发送消息伴随本地事务"""
        with transaction.atomic():
            # 执行业务逻辑
            result = business_logic()
            
            # 保存消息到本地消息表
            message = LocalMessage.objects.create(
                message_id=str(uuid.uuid4()),
                topic=topic,
                content=json.dumps(content),
                status='pending'
            )
            
            return result, message.id
    
    @staticmethod
    def process_pending_messages():
        """处理待发送消息"""
        pending_messages = LocalMessage.objects.filter(
            status='pending'
        ).select_for_update()
        
        for message in pending_messages:
            try:
                # 发送消息到消息队列
                # 这里应该集成具体的消息队列服务
                print(f"Sending message: {message.topic} - {message.content}")
                
                # 更新消息状态
                message.status = 'sent'
                message.save()
                
            except Exception as e:
                message.retry_count += 1
                if message.retry_count >= message.max_retries:
                    message.status = 'failed'
                message.save()

# 定时任务处理消息
from celery import shared_task

@shared_task
def process_local_messages():
    """处理本地消息的定时任务"""
    MessageService.process_pending_messages()

服务发现与注册

服务注册中心

"""
服务注册中心功能:
1. 服务注册 (Service Registration)
   - 自动注册
   - 手动注册
   - 健康检查

2. 服务发现 (Service Discovery)
   - DNS查询
   - API查询
   - 负载均衡

3. 健康监测 (Health Monitoring)
   - 心跳检测
   - 状态同步
   - 故障转移

4. 配置管理 (Configuration Management)
   - 动态配置
   - 环境管理
   - 版本控制
"""

# 基于Consul的服务发现
import consul
import json
import time
from typing import Dict, List, Optional

class ConsulServiceRegistry:
    """Consul服务注册器"""
    
    def __init__(self, host: str = 'localhost', port: int = 8500):
        self.consul = consul.Consul(host=host, port=port)
    
    def register_service(self, service_id: str, name: str, address: str, port: int, 
                        tags: List[str] = None, check_interval: str = '10s'):
        """注册服务"""
        check = {
            'http': f'http://{address}:{port}/health/',
            'interval': check_interval,
            'timeout': '5s'
        }
        
        service_registration = {
            'ID': service_id,
            'Name': name,
            'Address': address,
            'Port': port,
            'Tags': tags or [],
            'Check': check
        }
        
        self.consul.agent.service.register(**service_registration)
        print(f"Service {name} registered with ID {service_id}")
    
    def deregister_service(self, service_id: str):
        """注销服务"""
        self.consul.agent.service.deregister(service_id)
        print(f"Service {service_id} deregistered")
    
    def discover_service(self, service_name: str) -> List[Dict]:
        """发现服务"""
        index, services = self.consul.health.service(service_name, passing=True)
        return services
    
    def get_service_instance(self, service_name: str) -> Optional[Dict]:
        """获取服务实例"""
        services = self.discover_service(service_name)
        if services:
            service = services[0]['Service']  # 获取第一个健康实例
            return {
                'address': service['Address'],
                'port': service['Port'],
                'id': service['ID'],
                'tags': service['Tags']
            }
        return None
    
    def watch_services(self, service_name: str, callback: callable):
        """监听服务变化"""
        last_index = None
        
        while True:
            try:
                index, services = self.consul.health.service(service_name, index=last_index)
                if index != last_index:
                    callback(services)
                    last_index = index
                time.sleep(1)
            except Exception as e:
                print(f"Error watching services: {e}")
                time.sleep(5)

# 基于数据库的服务发现
from django.db import models, transaction
from django.utils import timezone
import json

class ServiceRegistry(models.Model):
    """服务注册模型"""
    name = models.CharField(max_length=100)
    version = models.CharField(max_length=20)
    host = models.CharField(max_length=100)
    port = models.IntegerField()
    status = models.CharField(max_length=20, default='active')
    metadata = models.JSONField(default=dict)
    last_heartbeat = models.DateTimeField(auto_now=True)
    created_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        db_table = 'service_registry'
        unique_together = ['name', 'host', 'port']
    
    def is_healthy(self, timeout_minutes: int = 5) -> bool:
        """检查服务是否健康"""
        return (timezone.now() - self.last_heartbeat).seconds < (timeout_minutes * 60)

class ServiceDiscoveryService:
    """服务发现服务"""
    
    @staticmethod
    def register_service(name: str, version: str, host: str, port: int, metadata: dict = None):
        """注册服务"""
        with transaction.atomic():
            service, created = ServiceRegistry.objects.update_or_create(
                name=name,
                host=host,
                port=port,
                defaults={
                    'version': version,
                    'status': 'active',
                    'metadata': metadata or {}
                }
            )
            return service
    
    @staticmethod
    def heartbeat(service_id: int):
        """心跳检测"""
        try:
            service = ServiceRegistry.objects.get(id=service_id)
            service.last_heartbeat = timezone.now()
            service.save()
            return True
        except ServiceRegistry.DoesNotExist:
            return False
    
    @staticmethod
    def discover_services(name: str) -> List[ServiceRegistry]:
        """发现服务"""
        return ServiceRegistry.objects.filter(
            name=name,
            status='active'
        ).select_related()
    
    @staticmethod
    def get_healthy_instances(name: str) -> List[ServiceRegistry]:
        """获取健康实例"""
        services = ServiceRegistry.objects.filter(
            name=name,
            status='active'
        )
        
        healthy_services = []
        for service in services:
            if service.is_healthy():
                healthy_services.append(service)
        
        return healthy_services
    
    @staticmethod
    def deregister_service(name: str, host: str, port: int):
        """注销服务"""
        ServiceRegistry.objects.filter(
            name=name,
            host=host,
            port=port
        ).update(status='inactive')

# 服务健康检查
from django.http import JsonResponse
from django.views import View

class HealthCheckView(View):
    """服务健康检查视图"""
    
    def get(self, request):
        """健康检查端点"""
        import psutil
        import django
        
        health_status = {
            'status': 'healthy',
            'timestamp': timezone.now().isoformat(),
            'service': 'django-microservice',
            'version': django.get_version(),
            'system': {
                'cpu_percent': psutil.cpu_percent(),
                'memory_percent': psutil.virtual_memory().percent,
                'disk_percent': psutil.disk_usage('/').percent,
            },
            'database': self._check_database(),
            'cache': self._check_cache(),
        }
        
        return JsonResponse(health_status)
    
    def _check_database(self):
        """检查数据库连接"""
        from django.db import connection
        try:
            connection.ensure_connection()
            return {'status': 'connected', 'ping': connection.ops.adapt_datetimefield_value(timezone.now())}
        except Exception as e:
            return {'status': 'error', 'error': str(e)}
    
    def _check_cache(self):
        """检查缓存连接"""
        from django.core.cache import cache
        try:
            cache.set('health_check', 'ok', 10)
            value = cache.get('health_check')
            return {'status': 'connected' if value == 'ok' else 'error'}
        except Exception as e:
            return {'status': 'error', 'error': str(e)}

# 定时心跳任务
from celery import shared_task

@shared_task
def service_heartbeat():
    """服务心跳任务"""
    # 这里应该获取当前服务的注册信息并更新心跳
    pass

分布式事务处理

分布式事务解决方案

"""
分布式事务挑战:
1. ACID特性保证
2. 网络不可靠性
3. 数据一致性
4. 性能影响

解决方案:
1. 两阶段提交 (2PC)
2. Saga模式
3. TCC模式
4. 本地消息表
5. 最终一致性
"""

# 事务协调器
import uuid
from enum import Enum
from typing import List, Dict, Any
import json

class TransactionStatus(Enum):
    ACTIVE = "active"
    PREPARING = "preparing"
    COMMITTED = "committed"
    ROLLED_BACK = "rolled_back"
    UNKNOWN = "unknown"

class ParticipantStatus(Enum):
    PREPARED = "prepared"
    COMMITTED = "committed"
    ABORTED = "aborted"
    UNKNOWN = "unknown"

class TransactionParticipant:
    """事务参与者"""
    
    def __init__(self, participant_id: str, prepare_func: callable, commit_func: callable, rollback_func: callable):
        self.participant_id = participant_id
        self.prepare_func = prepare_func
        self.commit_func = commit_func
        self.rollback_func = rollback_func
        self.status = ParticipantStatus.UNKNOWN
    
    def prepare(self, data: Dict[str, Any]) -> bool:
        """准备阶段"""
        try:
            result = self.prepare_func(data)
            self.status = ParticipantStatus.PREPARED if result else ParticipantStatus.ABORTED
            return result
        except Exception as e:
            self.status = ParticipantStatus.ABORTED
            return False
    
    def commit(self, data: Dict[str, Any]) -> bool:
        """提交阶段"""
        try:
            result = self.commit_func(data)
            self.status = ParticipantStatus.COMMITTED if result else ParticipantStatus.UNKNOWN
            return result
        except Exception as e:
            self.status = ParticipantStatus.UNKNOWN
            return False
    
    def rollback(self, data: Dict[str, Any]) -> bool:
        """回滚阶段"""
        try:
            result = self.rollback_func(data)
            self.status = ParticipantStatus.ABORTED if result else ParticipantStatus.UNKNOWN
            return result
        except Exception as e:
            self.status = ParticipantStatus.UNKNOWN
            return False

class TwoPhaseCommitCoordinator:
    """两阶段提交协调器"""
    
    def __init__(self):
        self.participants: List[TransactionParticipant] = []
        self.transactions: Dict[str, Dict[str, Any]] = {}
    
    def add_participant(self, participant: TransactionParticipant):
        """添加参与者"""
        self.participants.append(participant)
    
    def begin_transaction(self) -> str:
        """开始事务"""
        transaction_id = str(uuid.uuid4())
        self.transactions[transaction_id] = {
            'status': TransactionStatus.ACTIVE,
            'participants': [p.participant_id for p in self.participants],
            'created_at': time.time()
        }
        return transaction_id
    
    def prepare_phase(self, transaction_id: str, data: Dict[str, Any]) -> bool:
        """准备阶段"""
        if transaction_id not in self.transactions:
            return False
        
        self.transactions[transaction_id]['status'] = TransactionStatus.PREPARING
        
        all_prepared = True
        prepared_participants = []
        
        for participant in self.participants:
            if participant.prepare(data):
                prepared_participants.append(participant.participant_id)
            else:
                all_prepared = False
                # 立即回滚已准备的参与者
                self._rollback_prepared(transaction_id, prepared_participants, data)
                break
        
        if all_prepared:
            self.transactions[transaction_id]['status'] = TransactionStatus.COMMITTED
            return True
        else:
            self.transactions[transaction_id]['status'] = TransactionStatus.ROLLED_BACK
            return False
    
    def commit_phase(self, transaction_id: str, data: Dict[str, Any]) -> bool:
        """提交阶段"""
        if (transaction_id not in self.transactions or 
            self.transactions[transaction_id]['status'] != TransactionStatus.COMMITTED):
            return False
        
        all_committed = True
        for participant in self.participants:
            if not participant.commit(data):
                all_committed = False
                break
        
        return all_committed
    
    def _rollback_prepared(self, transaction_id: str, prepared_participants: List[str], data: Dict[str, Any]):
        """回滚已准备的参与者"""
        for participant in self.participants:
            if participant.participant_id in prepared_participants:
                participant.rollback(data)

# 使用示例
"""
coordinator = TwoPhaseCommitCoordinator()

# 创建参与者
user_participant = TransactionParticipant(
    'user_service',
    lambda data: print(f"Preparing user operation: {data}"),
    lambda data: print(f"Committing user operation: {data}"),
    lambda data: print(f"Rolling back user operation: {data}")
)

order_participant = TransactionParticipant(
    'order_service', 
    lambda data: print(f"Preparing order operation: {data}"),
    lambda data: print(f"Committing order operation: {data}"),
    lambda data: print(f"Rolling back order operation: {data}")
)

coordinator.add_participant(user_participant)
coordinator.add_participant(order_participant)

# 执行事务
transaction_id = coordinator.begin_transaction()
data = {'user_id': 123, 'order_amount': 100.00}

if coordinator.prepare_phase(transaction_id, data):
    coordinator.commit_phase(transaction_id, data)
    print("Transaction committed successfully")
else:
    print("Transaction rolled back")
"""

分布式锁实现

# 基于Redis的分布式锁
import redis
import time
import uuid
from contextlib import contextmanager

class DistributedLock:
    """分布式锁实现"""
    
    def __init__(self, redis_client: redis.Redis, lock_key: str, expire_time: int = 30):
        self.redis_client = redis_client
        self.lock_key = f"lock:{lock_key}"
        self.expire_time = expire_time
        self.identifier = str(uuid.uuid4())
    
    def acquire(self, timeout: int = 10) -> bool:
        """获取锁"""
        end_time = time.time() + timeout
        
        while time.time() < end_time:
            # 使用SET命令的NX和EX选项原子性地设置锁
            if self.redis_client.set(
                self.lock_key, 
                self.identifier, 
                nx=True,  # 只在键不存在时设置
                ex=self.expire_time  # 设置过期时间
            ):
                return True
            time.sleep(0.001)  # 短暂休眠后重试
        
        return False
    
    def release(self) -> bool:
        """释放锁"""
        # 使用Lua脚本确保原子性
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        
        try:
            result = self.redis_client.eval(lua_script, 1, self.lock_key, self.identifier)
            return result == 1
        except Exception:
            return False
    
    @contextmanager
    def lock(self, timeout: int = 10):
        """上下文管理器形式的锁"""
        acquired = self.acquire(timeout)
        if not acquired:
            raise TimeoutError("Could not acquire distributed lock")
        
        try:
            yield
        finally:
            self.release()

# 使用示例
"""
redis_client = redis.Redis(host='localhost', port=6379, db=0)

# 方法1:手动获取和释放锁
lock = DistributedLock(redis_client, 'order_processing', expire_time=30)

if lock.acquire(timeout=10):
    try:
        # 执行临界区代码
        process_order(order_id)
    finally:
        lock.release()
else:
    print("Could not acquire lock")

# 方法2:使用上下文管理器
lock = DistributedLock(redis_client, 'order_processing', expire_time=30)

try:
    with lock.lock(timeout=10):
        # 执行临界区代码
        process_order(order_id)
except TimeoutError:
    print("Could not acquire lock")
"""

分布式缓存策略

"""
分布式缓存策略:

1. 多级缓存 (Multi-level Caching)
   - L1: 进程内缓存 (内存)
   - L2: 分布式缓存 (Redis/Memcached)
   - L3: 数据库缓存 (查询结果)

2. 缓存模式 (Cache Patterns)
   - Cache-Aside
   - Read-Through
   - Write-Through
   - Write-Behind

3. 缓存更新策略 (Cache Update Strategies)
   - TTL (Time To Live)
   - LRU (Least Recently Used)
   - LFU (Least Frequently Used)
"""

# 多级缓存实现
import pickle
from typing import Any, Optional
from django.core.cache import cache as django_cache

class MultiLevelCache:
    """多级缓存系统"""
    
    def __init__(self, redis_client: redis.Redis, local_cache_size: int = 1000):
        self.redis_client = redis_client
        self.local_cache = {}  # L1缓存
        self.local_cache_size = local_cache_size
        self.l2_prefix = "l2:"
    
    def get(self, key: str) -> Optional[Any]:
        """获取缓存值"""
        # L1: 本地缓存
        if key in self.local_cache:
            return self.local_cache[key]
        
        # L2: Redis缓存
        serialized_value = self.redis_client.get(f"{self.l2_prefix}{key}")
        if serialized_value:
            value = pickle.loads(serialized_value)
            # 同步到L1缓存
            self._update_l1_cache(key, value)
            return value
        
        # L3: Django缓存
        value = django_cache.get(key)
        if value is not None:
            # 同步到L1和L2缓存
            self._update_l1_cache(key, value)
            self.set(key, value)  # 设置到L2
            return value
        
        return None
    
    def set(self, key: str, value: Any, timeout: int = 300):
        """设置缓存值"""
        # 设置到L1缓存
        self._update_l1_cache(key, value)
        
        # 设置到L2缓存
        serialized_value = pickle.dumps(value)
        self.redis_client.setex(f"{self.l2_prefix}{key}", timeout, serialized_value)
        
        # 设置到L3缓存
        django_cache.set(key, value, timeout)
    
    def delete(self, key: str):
        """删除缓存"""
        # 从所有层级删除
        if key in self.local_cache:
            del self.local_cache[key]
        
        self.redis_client.delete(f"{self.l2_prefix}{key}")
        django_cache.delete(key)
    
    def _update_l1_cache(self, key: str, value: Any):
        """更新L1缓存"""
        if len(self.local_cache) >= self.local_cache_size:
            # 简单的LRU策略:删除最早添加的项
            oldest_key = next(iter(self.local_cache))
            del self.local_cache[oldest_key]
        
        self.local_cache[key] = value

# 缓存装饰器
def cached_method(expire_time: int = 300, cache_key_prefix: str = ""):
    """方法缓存装饰器"""
    def decorator(func):
        from functools import wraps
        
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            # 生成缓存键
            cache_key_parts = [
                cache_key_prefix or func.__name__,
                str(self.__class__.__name__),
                str(args),
                str(sorted(kwargs.items()))
            ]
            cache_key = ":".join(cache_key_parts)
            
            # 尝试从缓存获取
            result = django_cache.get(cache_key)
            if result is not None:
                return result
            
            # 执行方法并缓存结果
            result = func(self, *args, **kwargs)
            django_cache.set(cache_key, result, expire_time)
            
            return result
        
        return wrapper
    
    return decorator

# Cache-Aside模式示例
class CacheAsideService:
    """Cache-Aside模式服务"""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.cache_prefix = "cache_aside:"
    
    def get_user(self, user_id: int) -> dict:
        """获取用户信息(Cache-Aside模式)"""
        cache_key = f"{self.cache_prefix}user:{user_id}"
        
        # 1. 尝试从缓存获取
        cached_user = self.redis_client.get(cache_key)
        if cached_user:
            return pickle.loads(cached_user)
        
        # 2. 缓存未命中,从数据库获取
        user = self._fetch_user_from_db(user_id)
        
        # 3. 将结果存入缓存
        if user:
            self.redis_client.setex(
                cache_key, 
                300,  # 5分钟过期
                pickle.dumps(user)
            )
        
        return user
    
    def update_user(self, user_id: int, update_data: dict) -> bool:
        """更新用户信息(Cache-Aside模式)"""
        # 1. 更新数据库
        success = self._update_user_in_db(user_id, update_data)
        
        if success:
            # 2. 使缓存失效
            cache_key = f"{self.cache_prefix}user:{user_id}"
            self.redis_client.delete(cache_key)
        
        return success
    
    def _fetch_user_from_db(self, user_id: int) -> dict:
        """从数据库获取用户(模拟)"""
        # 这里应该是真实的数据库查询
        return {"id": user_id, "name": f"User {user_id}", "email": f"user{user_id}@example.com"}
    
    def _update_user_in_db(self, user_id: int, update_data: dict) -> bool:
        """更新数据库中的用户(模拟)"""
        # 这里应该是真实的数据库更新
        return True

# 使用示例
"""
redis_client = redis.Redis(host='localhost', port=6379, db=0)

# 多级缓存使用
multi_cache = MultiLevelCache(redis_client)

# 设置缓存
multi_cache.set("user:123", {"name": "John", "age": 30}, 600)

# 获取缓存
user_data = multi_cache.get("user:123")

# Cache-Aside模式使用
cache_service = CacheAsideService(redis_client)
user = cache_service.get_user(123)
"""

监控与治理

微服务监控

"""
微服务监控维度:

1. 应用层监控 (Application Level)
   - API响应时间
   - 错误率
   - 吞吐量

2. 基础设施监控 (Infrastructure Level)
   - CPU使用率
   - 内存使用
   - 磁盘I/O
   - 网络流量

3. 业务监控 (Business Level)
   - 业务指标
   - 用户行为
   - 转化率

4. 分布式追踪 (Distributed Tracing)
   - 请求链路追踪
   - 跨服务调用
   - 性能瓶颈分析
"""

# 自定义监控中间件
import time
import logging
from django.http import HttpRequest, HttpResponse
from django.db import connection
from django.core.handlers.wsgi import WSGIRequest

logger = logging.getLogger('monitoring')

class MonitoringMiddleware:
    """监控中间件"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request: WSGIRequest):
        start_time = time.time()
        start_queries = len(connection.queries)
        
        response = self.get_response(request)
        
        duration = time.time() - start_time
        query_count = len(connection.queries) - start_queries
        
        # 记录监控数据
        self._record_metrics(request, response, duration, query_count)
        
        return response
    
    def _record_metrics(self, request: WSGIRequest, response: HttpResponse, duration: float, query_count: int):
        """记录监控指标"""
        metrics = {
            'timestamp': time.time(),
            'method': request.method,
            'path': request.path,
            'status_code': response.status_code,
            'duration': duration,
            'query_count': query_count,
            'user_agent': request.META.get('HTTP_USER_AGENT', ''),
            'remote_addr': request.META.get('REMOTE_ADDR', ''),
        }
        
        # 记录到日志
        logger.info(f"REQUEST_METRICS: {metrics}")
        
        # 可以发送到监控系统(如Prometheus、InfluxDB等)
        self._send_to_monitoring_system(metrics)
    
    def _send_to_monitoring_system(self, metrics: dict):
        """发送指标到监控系统"""
        # 这里可以集成具体的监控系统
        # 例如:Prometheus Client, StatsD, InfluxDB等
        pass

# Prometheus指标收集器
"""
# 需要安装: pip install prometheus-client

from prometheus_client import Counter, Histogram, Gauge
import time

# 定义指标
REQUEST_COUNT = Counter('django_http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_LATENCY = Histogram('django_http_request_duration_seconds', 'HTTP request latency', ['method', 'endpoint'])
ACTIVE_CONNECTIONS = Gauge('django_db_active_connections', 'Active database connections')

class PrometheusMetricsMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        start_time = time.time()
        
        response = self.get_response(request)
        
        # 记录指标
        REQUEST_COUNT.labels(
            method=request.method,
            endpoint=request.path,
            status=response.status_code
        ).inc()
        
        REQUEST_LATENCY.labels(
            method=request.method,
            endpoint=request.path
        ).observe(time.time() - start_time)
        
        return response
"""

服务治理

"""
服务治理包括:

1. 服务注册与发现 (Service Registry & Discovery)
2. 负载均衡 (Load Balancing)
3. 熔断器 (Circuit Breaker)
4. 限流 (Rate Limiting)
5. 重试机制 (Retry Mechanism)
6. 降级策略 (Degradation Strategy)
"""

# 熔断器实现
import time
from enum import Enum
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"      # 正常状态
    OPEN = "open"          # 熔断状态
    HALF_OPEN = "half_open"  # 半开状态

class CircuitBreaker:
    """熔断器实现"""
    
    def __init__(self, failure_threshold: int = 5, timeout: int = 60, expected_exception: type = Exception):
        self.failure_threshold = failure_threshold  # 失败阈值
        self.timeout = timeout                      # 熔断超时时间
        self.expected_exception = expected_exception  # 预期异常类型
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    def call(self, func: Callable, *args, **kwargs) -> Any:
        """执行带熔断的调用"""
        if self.state == CircuitState.OPEN:
            # 检查是否应该进入半开状态
            if time.time() - self.last_failure_time >= self.timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                # 熔断状态,直接抛出异常
                raise Exception("Circuit breaker is OPEN")
        
        try:
            if self.state == CircuitState.HALF_OPEN:
                # 半开状态下只允许一次调用进行测试
                result = func(*args, **kwargs)
                # 调用成功,重置状态
                self._reset()
                return result
            else:
                # 正常调用
                result = func(*args, **kwargs)
                # 调用成功,重置失败计数
                if self.failure_count > 0:
                    self.failure_count = 0
                return result
        
        except self.expected_exception as e:
            self._record_failure()
            raise e
    
    def _record_failure(self):
        """记录失败"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
    
    def _reset(self):
        """重置熔断器"""
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

# 限流器实现
import time
from collections import deque

class TokenBucket:
    """令牌桶限流器"""
    
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity          # 桶容量
        self.refill_rate = refill_rate    # 令牌填充速率(每秒)
        self.tokens = capacity            # 当前令牌数
        self.last_refill = time.time()    # 上次填充时间
    
    def consume(self, tokens: int = 1) -> bool:
        """消费令牌"""
        # 填充令牌
        now = time.time()
        tokens_to_add = (now - self.last_refill) * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + tokens_to_add)
        self.last_refill = now
        
        # 检查是否有足够令牌
        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False

class RateLimiter:
    """基于令牌桶的限流器"""
    
    def __init__(self, max_requests: int, window_seconds: int):
        self.bucket = TokenBucket(max_requests, max_requests / window_seconds)
    
    def is_allowed(self) -> bool:
        """检查是否允许请求"""
        return self.bucket.consume(1)

# 重试机制
import random
from typing import Type, Tuple

def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    backoff_factor: float = 2.0,
    jitter: bool = True,
    allowed_exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
    """带退避的重试装饰器"""
    def decorator(func):
        from functools import wraps
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except allowed_exceptions as e:
                    last_exception = e
                    
                    if attempt == max_retries:  # 最后一次尝试
                        break
                    
                    # 计算延迟时间
                    delay = min(base_delay * (backoff_factor ** attempt), max_delay)
                    
                    # 添加随机抖动
                    if jitter:
                        delay *= (0.5 + random.random() * 0.5)
                    
                    time.sleep(delay)
            
            raise last_exception
        
        return wrapper
    
    return decorator

# 使用示例
"""
# 熔断器使用
breaker = CircuitBreaker(failure_threshold=3, timeout=30)

def unreliable_service_call():
    # 模拟不稳定的服务调用
    if random.random() < 0.7:  # 70%概率失败
        raise Exception("Service temporarily unavailable")
    return "Success"

try:
    result = breaker.call(unreliable_service_call)
    print(f"Result: {result}")
except Exception as e:
    print(f"Call failed: {e}")

# 限流器使用
limiter = RateLimiter(max_requests=10, window_seconds=60)  # 每分钟最多10次请求

if limiter.is_allowed():
    # 执行请求
    process_request()
else:
    # 限流,返回错误
    return "Rate limit exceeded"

# 重试机制使用
@retry_with_backoff(max_retries=3, base_delay=1.0, allowed_exceptions=(ConnectionError, TimeoutError))
def unreliable_api_call():
    # 模拟不稳定的API调用
    if random.random() < 0.6:  # 60%概率失败
        raise ConnectionError("API temporarily unavailable")
    return "Success"

result = unreliable_api_call()
"""

迁移策略

单体到微服务迁移

"""
单体到微服务迁移策略:

1. 演进式迁移 (Evolutionary Migration)
   - 渐进式拆分
   - 逐步解耦
   - 风险可控

2. 大爆炸迁移 (Big Bang Migration)
   - 一次性迁移
   - 风险较高
   - 适用于新系统

3. 绞杀者模式 (Strangler Fig Pattern)
   - 逐步替换功能
   - 新功能以微服务实现
   - 逐步淘汰旧功能
"""

# 数据迁移工具
import json
import csv
from typing import Generator, Dict, Any
from django.db import transaction
from django.core.serializers import serialize
from django.core.serializers.json import DjangoJSONEncoder

class DataMigrationTool:
    """数据迁移工具"""
    
    @staticmethod
    def export_model_data(model_class, output_file: str, batch_size: int = 1000):
        """导出模型数据"""
        with open(output_file, 'w', encoding='utf-8') as f:
            # 写入CSV头部
            field_names = [field.name for field in model_class._meta.fields]
            writer = csv.DictWriter(f, fieldnames=field_names)
            writer.writeheader()
            
            # 分批导出数据
            offset = 0
            while True:
                batch = model_class.objects.all()[offset:offset + batch_size]
                if not batch:
                    break
                
                for obj in batch:
                    row = {}
                    for field in model_class._meta.fields:
                        value = getattr(obj, field.name)
                        # 处理特殊字段类型
                        if hasattr(value, 'isoformat'):  # 日期时间字段
                            row[field.name] = value.isoformat()
                        elif isinstance(value, (list, dict)):  # JSON字段
                            row[field.name] = json.dumps(value, cls=DjangoJSONEncoder)
                        else:
                            row[field.name] = value
                    
                    writer.writerow(row)
                
                offset += batch_size
    
    @staticmethod
    def import_model_data(model_class, input_file: str, batch_size: int = 1000):
        """导入模型数据"""
        with open(input_file, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            
            batch = []
            for row in reader:
                # 转换数据类型
                for field in model_class._meta.fields:
                    if field.name in row and row[field.name]:
                        if field.get_internal_type() in ['DateTimeField', 'DateField']:
                            # 这里需要根据实际格式转换
                            pass
                        elif field.get_internal_type() == 'JSONField':
                            row[field.name] = json.loads(row[field.name])
                
                obj = model_class(**row)
                batch.append(obj)
                
                if len(batch) >= batch_size:
                    with transaction.atomic():
                        model_class.objects.bulk_create(batch)
                        batch = []
            
            # 处理剩余数据
            if batch:
                with transaction.atomic():
                    model_class.objects.bulk_create(batch)
    
    @staticmethod
    def validate_data_consistency(source_model, target_model, compare_fields: list):
        """验证数据一致性"""
        source_count = source_model.objects.count()
        target_count = target_model.objects.count()
        
        if source_count != target_count:
            print(f"数据量不一致: 源表 {source_count}, 目标表 {target_count}")
            return False
        
        # 比较关键字段的一致性
        for field_name in compare_fields:
            source_values = set(source_model.objects.values_list(field_name, flat=True))
            target_values = set(target_model.objects.values_list(field_name, flat=True))
            
            if source_values != target_values:
                diff = source_values.symmetric_difference(target_values)
                print(f"字段 {field_name} 数据不一致,差异: {diff}")
                return False
        
        print("数据一致性验证通过")
        return True

# 服务拆分工具
class ServiceExtractor:
    """服务拆分工具"""
    
    @staticmethod
    def analyze_dependencies(app_name: str) -> Dict[str, Any]:
        """分析应用依赖关系"""
        from django.apps import apps
        from django.db import models
        
        app_config = apps.get_app_config(app_name)
        dependencies = {
            'models': [],
            'views': [],
            'urls': [],
            'external_deps': [],
            'internal_deps': []
        }
        
        # 分析模型依赖
        for model in app_config.get_models():
            model_deps = []
            for field in model._meta.get_fields():
                if hasattr(field, 'related_model') and field.related_model:
                    related_app = field.related_model._meta.app_label
                    if related_app != app_name:
                        model_deps.append(f"{related_app}.{field.related_model.__name__}")
            
            dependencies['models'].append({
                'model': model.__name__,
                'deps': model_deps
            })
        
        return dependencies
    
    @staticmethod
    def generate_migration_plan(boundary_contexts: list) -> Dict[str, Any]:
        """生成迁移计划"""
        plan = {
            'services': [],
            'migration_order': [],
            'data_flow': [],
            'api_contracts': []
        }
        
        for context in boundary_contexts:
            service_info = {
                'name': context['name'],
                'models': context['models'],
                'apis': context['apis'],
                'dependencies': context['dependencies'],
                'migration_priority': context.get('priority', 'medium')
            }
            plan['services'].append(service_info)
        
        # 确定迁移顺序(基于依赖关系)
        plan['migration_order'] = ServiceExtractor._calculate_migration_order(plan['services'])
        
        return plan
    
    @staticmethod
    def _calculate_migration_order(services: list) -> list:
        """计算迁移顺序"""
        # 简化的拓扑排序
        ordered_services = []
        remaining_services = services.copy()
        
        while remaining_services:
            for i, service in enumerate(remaining_services):
                # 检查依赖是否都已处理
                deps_handled = True
                for dep in service.get('dependencies', []):
                    dep_service_name = dep.split('.')[0]  # 提取服务名
                    if not any(s['name'] == dep_service_name for s in ordered_services):
                        deps_handled = False
                        break
                
                if deps_handled:
                    ordered_services.append(service)
                    remaining_services.pop(i)
                    break
            else:
                # 如果无法找到可处理的服务,可能存在循环依赖
                print("警告: 可能存在循环依赖")
                break
        
        return [s['name'] for s in ordered_services]

# 绞杀者模式实现
class StranglerPattern:
    """绞杀者模式实现"""
    
    def __init__(self, legacy_router, microservice_registry):
        self.legacy_router = legacy_router
        self.microservice_registry = microservice_registry
        self.migration_rules = {}  # 迁移规则
    
    def add_migration_rule(self, url_pattern: str, microservice_name: str, traffic_ratio: float = 0.0):
        """添加迁移规则"""
        self.migration_rules[url_pattern] = {
            'microservice': microservice_name,
            'traffic_ratio': traffic_ratio  # 0.0-1.0,表示流量比例
        }
    
    def route_request(self, request):
        """路由请求"""
        import random
        
        for pattern, rule in self.migration_rules.items():
            if self._matches_pattern(request.path, pattern):
                # 根据流量比例决定路由到哪里
                if random.random() < rule['traffic_ratio']:
                    # 路由到微服务
                    return self._route_to_microservice(request, rule['microservice'])
                else:
                    # 路由到遗留系统
                    return self.legacy_router(request)
        
        # 默认路由到遗留系统
        return self.legacy_router(request)
    
    def _matches_pattern(self, path: str, pattern: str) -> bool:
        """检查路径是否匹配模式"""
        import re
        # 简单的模式匹配,实际应用中可能需要更复杂的路由逻辑
        return re.match(pattern.replace('*', '.*'), path) is not None
    
    def _route_to_microservice(self, request, microservice_name: str):
        """路由到微服务"""
        # 获取微服务实例
        service_instance = self.microservice_registry.get_instance(microservice_name)
        if not service_instance:
            # 微服务不可用,回退到遗留系统
            return self.legacy_router(request)
        
        # 转发请求到微服务
        return self._forward_to_service(request, service_instance)
    
    def _forward_to_service(self, request, service_instance):
        """转发请求到服务"""
        # 这里实现实际的请求转发逻辑
        # 可能使用HTTP客户端、gRPC客户端等
        pass
    
    def gradually_increase_traffic(self, url_pattern: str, increment: float = 0.1):
        """逐渐增加流量"""
        if url_pattern in self.migration_rules:
            current_ratio = self.migration_rules[url_pattern]['traffic_ratio']
            new_ratio = min(1.0, current_ratio + increment)
            self.migration_rules[url_pattern]['traffic_ratio'] = new_ratio
            print(f"URL模式 {url_pattern} 的流量比例已调整为 {new_ratio}")

# 使用示例
"""
# 数据迁移示例
DataMigrationTool.export_model_data(User, 'users_export.csv')
DataMigrationTool.import_model_data(NewUserModel, 'users_export.csv')

# 服务拆分分析
dependencies = ServiceExtractor.analyze_dependencies('ecommerce')
migration_plan = ServiceExtractor.generate_migration_plan([
    {
        'name': 'user_service',
        'models': ['User', 'UserProfile'],
        'apis': ['/api/users/', '/api/profiles/'],
        'dependencies': [],
        'priority': 'high'
    },
    {
        'name': 'product_service', 
        'models': ['Product', 'Category'],
        'apis': ['/api/products/', '/api/categories/'],
        'dependencies': ['user_service.User'],
        'priority': 'medium'
    }
])

# 绞杀者模式示例
strangler = StranglerPattern(legacy_router, microservice_registry)

# 开始迁移,开始时只有10%流量到新服务
strangler.add_migration_rule('/api/users/*', 'user_service', traffic_ratio=0.1)

# 在某个时间点增加流量比例
strangler.gradually_increase_traffic('/api/users/*', increment=0.2)
"""

本章小结

在本章中,我们深入学习了Django微服务架构:

  1. 微服务架构概述:了解了微服务的定义、特点、优势与挑战
  2. 服务拆分策略:学习了基于DDD的服务拆分方法
  3. API网关设计:掌握了网关的核心功能和实现
  4. 服务通信机制:了解了同步和异步通信方式
  5. 分布式数据管理:学习了数据一致性和事务处理
  6. 服务发现与注册:掌握了服务注册中心的实现
  7. 分布式事务处理:了解了多种事务解决方案
  8. 监控与治理:学习了服务监控和治理机制
  9. 迁移策略:掌握了从单体到微服务的迁移方法

核心要点回顾

"""
本章核心要点:

1. 微服务架构通过服务拆分实现系统解耦
2. API网关提供统一入口和集中管理
3. 异步通信提高系统吞吐量和解耦
4. 分布式事务需要权衡一致性和可用性
5. 服务发现是微服务架构的基础
6. 监控和治理确保系统稳定性
7. 渐进式迁移降低转型风险
8. 选择合适的拆分粒度很重要
"""

💡 核心要点:微服务架构是一个复杂的分布式系统,需要综合考虑服务拆分、通信、数据管理、监控治理等多个方面。选择合适的工具和模式,建立完善的运维体系,是确保微服务系统成功的关键。

SEO优化策略

  1. 关键词布局: 在标题、内容中合理布局"Django微服务", "分布式架构", "服务拆分", "API网关", "分布式事务", "服务治理"等关键词
  2. 内容结构: 使用清晰的标题层级(H1-H3),便于搜索引擎理解内容结构
  3. 内部链接: 建立与其他相关教程的内部链接,提升页面权重
  4. 代码示例: 提供丰富的实际代码示例,增加页面价值
  5. 元数据优化: 在页面头部包含描述性的标题、描述和标签

🔗 相关教程推荐

🏷️ 标签云: Django微服务 分布式架构 服务拆分 API网关 分布式事务 服务治理 服务通信 监控 迁移策略 </task_progress>