综合实战项目

课程目标

  • 综合运用前面章节的技术知识
  • 完成一个完整的App爬虫项目
  • 掌握从环境搭建到数据入库的全流程
  • 学会处理实际项目中的各种挑战

1. 项目概述:某电商App商品数据爬取

我们将以一个典型的电商App为案例,实现完整的爬虫项目,包括环境搭建、反反爬虫、数据提取和存储。

1.1 项目需求分析

"""
电商App爬虫项目需求分析

目标App:某知名电商App
主要功能:
1. 商品列表页爬取(分类、筛选)
2. 商品详情页爬取(价格、评价、规格)
3. 评论数据爬取(用户评价、晒单图片)
4. 价格历史追踪(价格变动监控)

技术挑战:
1. SSL Pinning绕过
2. 请求签名验证
3. 滑块验证码
4. 频率限制
5. 数据去重和清洗
"""

class ECommerceCrawlerRequirements:
    """电商爬虫需求类"""
    
    def __init__(self):
        self.requirements = {
            'functional': {
                'product_list': {
                    'description': '商品列表页爬取',
                    'methods': ['分类爬取', '关键词搜索', '筛选条件'],
                    'data_fields': ['商品ID', '标题', '价格', '销量', '评分', '缩略图']
                },
                'product_detail': {
                    'description': '商品详情页爬取',
                    'methods': ['基本信息', '规格参数', '评价列表'],
                    'data_fields': ['商品ID', '详情图片', '规格信息', '库存', '品牌信息']
                },
                'reviews': {
                    'description': '评论数据爬取',
                    'methods': ['用户评价', '晒单图片', '追评'],
                    'data_fields': ['评论ID', '用户ID', '评分', '内容', '图片', '时间']
                },
                'pricing': {
                    'description': '价格监控',
                    'methods': ['历史价格', '促销信息', '优惠券'],
                    'data_fields': ['商品ID', '价格', '促销价', '时间戳']
                }
            },
            'technical': {
                'anti_anti_spider': {
                    'ssl_bypass': 'SSL Pinning绕过',
                    'sign_verification': '请求签名验证',
                    'captcha_handling': '验证码处理',
                    'rate_limiting': '频率限制应对'
                },
                'performance': {
                    'concurrency': '并发控制',
                    'retry_mechanism': '重试机制',
                    'error_handling': '异常处理'
                },
                'data_quality': {
                    'deduplication': '数据去重',
                    'validation': '数据校验',
                    'storage': '数据存储'
                }
            }
        }
    
    def get_requirements_summary(self):
        """获取需求摘要"""
        summary = {
            'total_functional_areas': len(self.requirements['functional']),
            'total_technical_challenges': len(self.requirements['technical']),
            'estimated_complexity': 'High',
            'estimated_timeline': '2-3 weeks',
            'required_skills': [
                'App逆向工程',
                'Frida使用',
                '网络协议分析',
                '数据库设计',
                '并发编程'
            ]
        }
        return summary

# 风险评估
class RiskAssessment:
    """风险评估"""
    
    def __init__(self):
        self.risks = {
            'legal_risks': {
                'terms_of_service_violation': {
                    'probability': 'High',
                    'impact': 'High',
                    'mitigation': '遵守robots.txt,限制爬取频率'
                },
                'copyright_infringement': {
                    'probability': 'Medium',
                    'impact': 'High',
                    'mitigation': '仅用于学习目的,不商业使用'
                }
            },
            'technical_risks': {
                'app_updates': {
                    'probability': 'High',
                    'impact': 'High',
                    'mitigation': '定期更新爬虫逻辑'
                },
                'anti_spider_enhancement': {
                    'probability': 'High',
                    'impact': 'High',
                    'mitigation': '持续优化反反爬虫策略'
                },
                'data_accuracy': {
                    'probability': 'Medium',
                    'impact': 'Medium',
                    'mitigation': '多重验证,数据清洗'
                }
            },
            'operational_risks': {
                'ip_banning': {
                    'probability': 'High',
                    'impact': 'Medium',
                    'mitigation': '使用代理池,控制请求频率'
                },
                'account_banning': {
                    'probability': 'Medium',
                    'impact': 'High',
                    'mitigation': '多账号轮换,模拟正常用户行为'
                }
            }
        }
    
    def get_risk_score(self):
        """计算风险分数"""
        score = 0
        for category, risks in self.risks.items():
            for risk_name, risk_info in risks.items():
                prob_map = {'Low': 1, 'Medium': 2, 'High': 3}
                impact_map = {'Low': 1, 'Medium': 2, 'High': 3}
                score += prob_map[risk_info['probability']] * impact_map[risk_info['impact']]
        
        return score

# 项目计划
class ProjectPlan:
    """项目计划"""
    
    def __init__(self):
        self.phases = [
            {
                'phase': 'Phase 1: 环境搭建与分析',
                'duration': '3 days',
                'tasks': [
                    'App下载与安装',
                    '网络流量分析',
                    '安全机制识别',
                    '反编译分析'
                ],
                'deliverables': ['分析报告', '技术方案']
            },
            {
                'phase': 'Phase 2: 反反爬虫实现',
                'duration': '5 days',
                'tasks': [
                    'SSL Pinning绕过',
                    '签名算法破解',
                    '请求头伪造',
                    '设备指纹绕过'
                ],
                'deliverables': ['绕过工具', '签名算法实现']
            },
            {
                'phase': 'Phase 3: 数据爬取实现',
                'duration': '7 days',
                'tasks': [
                    '商品列表爬取',
                    '商品详情爬取',
                    '评论数据爬取',
                    '价格监控实现'
                ],
                'deliverables': ['爬虫代码', '数据样本']
            },
            {
                'phase': 'Phase 4: 数据处理与存储',
                'duration': '4 days',
                'tasks': [
                    '数据清洗',
                    '去重处理',
                    '数据库设计',
                    '存储实现'
                ],
                'deliverables': ['数据库', '清洗后数据']
            },
            {
                'phase': 'Phase 5: 监控与维护',
                'duration': 'Ongoing',
                'tasks': [
                    '性能监控',
                    '错误处理',
                    '定期更新',
                    '报告生成'
                ],
                'deliverables': ['监控系统', '维护文档']
            }
        ]

def print_project_overview():
    """打印项目概览"""
    print("=== 电商App爬虫项目概览 ===\n")
    
    # 需求分析
    reqs = ECommerceCrawlerRequirements()
    summary = reqs.get_requirements_summary()
    
    print("需求摘要:")
    for key, value in summary.items():
        print(f"  {key}: {value}")
    
    print(f"\n功能需求:")
    for area, details in reqs.requirements['functional'].items():
        print(f"  {area}: {details['description']}")
    
    print(f"\n技术挑战:")
    for challenge, desc in reqs.requirements['technical']['anti_anti_spider'].items():
        print(f"  {challenge}: {desc}")
    
    # 风险评估
    risk_assessment = RiskAssessment()
    print(f"\n风险评估:")
    print(f"  总体风险分数: {risk_assessment.get_risk_score()}/27")
    
    # 项目计划
    plan = ProjectPlan()
    print(f"\n项目计划:")
    for phase in plan.phases:
        print(f"  {phase['phase']} ({phase['duration']}):")
        for task in phase['tasks']:
            print(f"    - {task}")

if __name__ == "__main__":
    print_project_overview()

1.2 技术架构设计

"""
电商App爬虫技术架构

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   控制中心      │────│   任务调度器     │────│   爬虫工作节点   │
│  (Controller)   │    │  (Scheduler)     │    │   (Workers)     │
└─────────────────┘    └──────────────────┘    └─────────────────┘
         │                        │                        │
         ▼                        ▼                        ▼
┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│  配置管理       │    │  监控告警系统    │    │  数据处理管道   │
│ (Config Mgmt)   │    │  (Monitoring)    │    │  (Pipeline)     │
└─────────────────┘    └──────────────────┘    └─────────────────┘
         │                        │                        │
         ▼                        ▼                        ▼
┌─────────────────────────────────────────────────────────────────┐
│                    数据存储层 (Storage Layer)                   │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌──────────┐  │
│  │  MySQL      │ │ Redis       │ │ MongoDB     │ │ 日志系统  │  │
│  │ (结构化数据) │ │ (缓存/队列) │ │ (非结构化)  │ │ (Logging) │  │
│  └─────────────┘ └─────────────┘ └─────────────┘ └──────────┘  │
└─────────────────────────────────────────────────────────────────┘
"""

class SystemArchitecture:
    """系统架构类"""
    
    def __init__(self):
        self.components = {
            'controller': {
                'name': '控制中心',
                'responsibilities': [
                    '全局配置管理',
                    '任务分发',
                    '状态监控',
                    '异常处理'
                ],
                'technology': 'Python Flask/FastAPI'
            },
            'scheduler': {
                'name': '任务调度器',
                'responsibilities': [
                    '任务队列管理',
                    '优先级调度',
                    '负载均衡',
                    '重试机制'
                ],
                'technology': 'Celery + Redis'
            },
            'workers': {
                'name': '爬虫工作节点',
                'responsibilities': [
                    'App自动化操作',
                    '数据提取',
                    '反反爬虫',
                    '数据验证'
                ],
                'technology': 'uiautomator2 + Frida'
            },
            'pipeline': {
                'name': '数据处理管道',
                'responsibilities': [
                    '数据清洗',
                    '格式转换',
                    '去重处理',
                    '质量检验'
                ],
                'technology': 'Python + Pandas'
            },
            'storage': {
                'name': '数据存储层',
                'responsibilities': [
                    '数据持久化',
                    '索引优化',
                    '备份恢复',
                    '性能优化'
                ],
                'technology': 'MySQL + Redis + MongoDB'
            },
            'monitoring': {
                'name': '监控告警系统',
                'responsibilities': [
                    '性能监控',
                    '错误追踪',
                    '资源使用',
                    '业务指标'
                ],
                'technology': 'Prometheus + Grafana'
            }
        }
    
    def get_component_interaction(self):
        """获取组件交互关系"""
        interactions = [
            "Controller → Scheduler: 任务分发",
            "Scheduler → Workers: 任务分配", 
            "Workers → Pipeline: 原始数据",
            "Pipeline → Storage: 清洗后数据",
            "All Components → Monitoring: 监控数据",
            "Monitoring → Controller: 告警通知"
        ]
        return interactions
    
    def get_technology_stack(self):
        """获取技术栈"""
        tech_stack = {
            'backend': ['Python 3.8+', 'FastAPI/Flask', 'Celery'],
            'automation': ['uiautomator2', 'Frida', 'Appium'],
            'database': ['MySQL 8.0+', 'Redis 6.0+', 'MongoDB 4.4+'],
            'monitoring': ['Prometheus', 'Grafana', 'ELK Stack'],
            'infrastructure': ['Docker', 'Kubernetes', 'Nginx']
        }
        return tech_stack

class ComponentDesign:
    """组件设计"""
    
    def __init__(self):
        self.design_patterns = {
            'controller': {
                'pattern': 'MVC + Factory',
                'design_principles': ['单一职责', '开闭原则', '依赖倒置'],
                'modules': [
                    'config_manager.py',
                    'task_dispatcher.py', 
                    'status_monitor.py',
                    'exception_handler.py'
                ]
            },
            'scheduler': {
                'pattern': 'Producer-Consumer',
                'design_principles': ['松耦合', '可扩展性', '容错性'],
                'modules': [
                    'task_queue.py',
                    'priority_scheduler.py',
                    'load_balancer.py',
                    'retry_mechanism.py'
                ]
            },
            'workers': {
                'pattern': 'Strategy + Observer',
                'design_principles': ['封装变化', '关注点分离'],
                'modules': [
                    'app_automator.py',
                    'data_extractor.py',
                    'anti_anti_spider.py',
                    'validator.py'
                ]
            }
        }
    
    def get_worker_architecture(self):
        """获取工作节点架构"""
        worker_arch = """
        Worker Node Architecture:
        
        ┌─────────────────────────────────────┐
        │            Worker Process           │
        ├─────────────────────────────────────┤
        │                                     │
        │  ┌─────────────────────────────────┐ │
        │  │        App Controller           │ │
        │  │  (uiautomator2/Frida)          │ │
        │  └─────────────────────────────────┘ │
        │              │                       │
        │              ▼                       │
        │  ┌─────────────────────────────────┐ │
        │  │       Data Extractor            │ │
        │  │    (Parser/Processor)           │ │
        │  └─────────────────────────────────┘ │
        │              │                       │
        │              ▼                       │
        │  ┌─────────────────────────────────┐ │
        │  │      Anti-Anti-Spider           │ │
        │  │     (Bypass/Mitigation)         │ │
        │  └─────────────────────────────────┘ │
        │              │                       │
        │              ▼                       │
        │  ┌─────────────────────────────────┐ │
        │  │         Validator               │ │
        │  │      (Quality Check)            │ │
        │  └─────────────────────────────────┘ │
        │              │                       │
        │              ▼                       │
        │  ┌─────────────────────────────────┐ │
        │  │        Data Pipeline            │ │
        │  │      (Clean/Transform)          │ │
        │  └─────────────────────────────────┘ │
        │                                     │
        └─────────────────────────────────────┘
        """
        return worker_arch

# 部署架构
class DeploymentArchitecture:
    """部署架构"""
    
    def __init__(self):
        self.deployment_options = {
            'development': {
                'env': 'Development',
                'nodes': 1,
                'resources': {'cpu': 2, 'memory': '4GB'},
                'components': ['all_in_one'],
                'monitoring': 'basic'
            },
            'testing': {
                'env': 'Testing', 
                'nodes': 3,
                'resources': {'cpu': 4, 'memory': '8GB'},
                'components': ['distributed'],
                'monitoring': 'standard'
            },
            'production': {
                'env': 'Production',
                'nodes': 10,
                'resources': {'cpu': 8, 'memory': '16GB'},
                'components': ['high_availability'],
                'monitoring': 'enterprise'
            }
        }
    
    def get_scaling_strategy(self):
        """获取扩展策略"""
        scaling = {
            'horizontal': {
                'worker_nodes': '根据任务队列长度动态扩展',
                'database': '读写分离,分库分表',
                'cache': '集群模式,自动分片'
            },
            'vertical': {
                'single_node': '提升单节点资源配置',
                'optimization': '代码优化,算法改进'
            }
        }
        return scaling

def print_architecture_design():
    """打印架构设计"""
    print("=== 电商App爬虫系统架构设计 ===\n")
    
    arch = SystemArchitecture()
    
    print("系统组件:")
    for comp_name, details in arch.components.items():
        print(f"  {details['name']} ({comp_name}):")
        print(f"    职责: {', '.join(details['responsibilities'])}")
        print(f"    技术: {details['technology']}")
        print()
    
    print("组件交互:")
    for interaction in arch.get_component_interaction():
        print(f"  {interaction}")
    
    print(f"\n技术栈:")
    tech = arch.get_technology_stack()
    for layer, technologies in tech.items():
        print(f"  {layer}: {', '.join(technologies)}")

if __name__ == "__main__":
    print_architecture_design()

2. 环境搭建与配置

2.1 开发环境准备

import os
import subprocess
import sys
from pathlib import Path
import json
import yaml

class DevelopmentEnvironment:
    """开发环境配置类"""
    
    def __init__(self, project_root: str = "."):
        self.project_root = Path(project_root)
        self.config_dir = self.project_root / "config"
        self.logs_dir = self.project_root / "logs"
        self.data_dir = self.project_root / "data"
        self.temp_dir = self.project_root / "temp"
        
    def create_project_structure(self):
        """创建项目目录结构"""
        directories = [
            self.config_dir,
            self.logs_dir,
            self.data_dir,
            self.temp_dir,
            self.project_root / "src",
            self.project_root / "src" / "controllers",
            self.project_root / "src" / "models", 
            self.project_root / "src" / "utils",
            self.project_root / "src" / "workers",
            self.project_root / "tests",
            self.project_root / "docs"
        ]
        
        for directory in directories:
            directory.mkdir(parents=True, exist_ok=True)
            print(f"创建目录: {directory}")
    
    def create_config_files(self):
        """创建配置文件"""
        # 主配置文件
        main_config = {
            "app_settings": {
                "target_app_package": "com.example.ecommerce",
                "device_serial": "",
                "headless_mode": False,
                "auto_restart": True
            },
            "crawler_settings": {
                "max_concurrent_tasks": 5,
                "request_delay": 2,
                "retry_attempts": 3,
                "timeout": 30
            },
            "database_settings": {
                "mysql": {
                    "host": "localhost",
                    "port": 3306,
                    "username": "crawler_user",
                    "password": "crawler_password",
                    "database": "ecommerce_data"
                },
                "redis": {
                    "host": "localhost",
                    "port": 6379,
                    "db": 0
                }
            },
            "monitoring_settings": {
                "enable_logging": True,
                "log_level": "INFO",
                "metrics_collection": True
            }
        }
        
        config_path = self.config_dir / "main_config.json"
        with open(config_path, 'w', encoding='utf-8') as f:
            json.dump(main_config, f, indent=2, ensure_ascii=False)
        
        print(f"创建主配置文件: {config_path}")
        
        # 环境变量配置
        env_config = {
            "FRIDA_SERVER_PATH": "/data/local/tmp/frida-server",
            "ADB_PATH": "adb",
            "PYTHONPATH": str(self.project_root / "src"),
            "LOG_DIR": str(self.logs_dir),
            "DATA_DIR": str(self.data_dir)
        }
        
        env_path = self.config_dir / "environment.json" 
        with open(env_path, 'w', encoding='utf-8') as f:
            json.dump(env_config, f, indent=2, ensure_ascii=False)
        
        print(f"创建环境配置文件: {env_path}")
    
    def create_requirements_file(self):
        """创建依赖文件"""
        requirements = [
            "frida>=16.0.0",
            "frida-tools>=12.0.0", 
            "uiautomator2>=2.16.23",
            "appium-python-client>=2.11.1",
            "selenium>=4.15.0",
            "requests>=2.31.0",
            "beautifulsoup4>=4.12.2",
            "lxml>=4.9.3",
            "pandas>=2.1.0",
            "numpy>=1.24.0",
            "sqlalchemy>=2.0.0",
            "pymysql>=1.1.0",
            "redis>=5.0.0",
            "celery>=5.3.0",
            "fastapi>=0.104.0",
            "uvicorn>=0.24.0",
            "pyyaml>=6.0.1",
            "loguru>=0.7.0",
            "fake-useragent>=1.4.0",
            "undetected-chromedriver>=3.5.0"
        ]
        
        req_path = self.project_root / "requirements.txt"
        with open(req_path, 'w', encoding='utf-8') as f:
            f.write("# 电商App爬虫项目依赖\n\n")
            for req in requirements:
                f.write(f"{req}\n")
        
        print(f"创建依赖文件: {req_path}")
    
    def create_docker_files(self):
        """创建Docker相关文件"""
        # Dockerfile
        dockerfile_content = '''# 基础镜像
FROM python:3.9-slim

# 设置工作目录
WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    adb \
    android-tools-fastboot \
    build-essential \
    libffi-dev \
    libssl-dev \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制项目文件
COPY . .

# 创建必要目录
RUN mkdir -p logs data temp config

# 设置环境变量
ENV PYTHONPATH=/app/src
ENV LOG_DIR=/app/logs
ENV DATA_DIR=/app/data

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["python", "-m", "src.main"]
'''
        
        dockerfile_path = self.project_root / "Dockerfile"
        with open(dockerfile_path, 'w', encoding='utf-8') as f:
            f.write(dockerfile_content)
        
        print(f"创建Dockerfile: {dockerfile_path}")
        
        # docker-compose.yml
        compose_content = '''
version: '3.8'

services:
  crawler-controller:
    build: .
    container_name: ecommerce-crawler-controller
    ports:
      - "8000:8000"
    volumes:
      - ./logs:/app/logs
      - ./data:/app/data
      - /dev/bus/usb:/dev/bus/usb  # USB设备访问
    environment:
      - PYTHONPATH=/app/src
      - TARGET_DEVICE=auto
    depends_on:
      - mysql
      - redis
      - mongodb

  mysql:
    image: mysql:8.0
    container_name: ecommerce-mysql
    environment:
      MYSQL_ROOT_PASSWORD: rootpassword
      MYSQL_DATABASE: ecommerce_data
      MYSQL_USER: crawler_user
      MYSQL_PASSWORD: crawler_password
    ports:
      - "3306:3306"
    volumes:
      - mysql_data:/var/lib/mysql

  redis:
    image: redis:7-alpine
    container_name: ecommerce-redis
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  mongodb:
    image: mongo:6.0
    container_name: ecommerce-mongodb
    ports:
      - "27017:27017"
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: adminpassword
    volumes:
      - mongo_data:/data/db

volumes:
  mysql_data:
  redis_data:
  mongo_data:
'''
        
        compose_path = self.project_root / "docker-compose.yml"
        with open(compose_path, 'w', encoding='utf-8') as f:
            f.write(compose_content)
        
        print(f"创建docker-compose.yml: {compose_path}")
    
    def setup_virtual_environment(self):
        """设置虚拟环境"""
        venv_path = self.project_root / "venv"
        
        try:
            # 创建虚拟环境
            subprocess.run([sys.executable, "-m", "venv", str(venv_path)], check=True)
            print(f"虚拟环境创建成功: {venv_path}")
            
            # 安装依赖
            pip_path = venv_path / "Scripts" / "pip.exe" if sys.platform == "win32" else venv_path / "bin" / "pip"
            subprocess.run([str(pip_path), "install", "-r", "requirements.txt"], check=True)
            print("依赖安装完成")
            
            return True
        except subprocess.CalledProcessError as e:
            print(f"虚拟环境设置失败: {e}")
            return False
    
    def verify_environment(self):
        """验证环境配置"""
        checks = {
            'python_version': self._check_python_version,
            'adb_available': self._check_adb,
            'frida_available': self._check_frida,
            'device_connected': self._check_device_connection
        }
        
        results = {}
        for check_name, check_func in checks.items():
            results[check_name] = check_func()
            status = "✓" if results[check_name] else "✗"
            print(f"{status} {check_name}: {results[check_name]}")
        
        return all(results.values())
    
    def _check_python_version(self):
        """检查Python版本"""
        import platform
        version = platform.python_version()
        return tuple(map(int, version.split('.')[:2])) >= (3, 8)
    
    def _check_adb(self):
        """检查ADB可用性"""
        try:
            result = subprocess.run(['adb', 'version'], capture_output=True, text=True, timeout=5)
            return result.returncode == 0
        except:
            return False
    
    def _check_frida(self):
        """检查Frida可用性"""
        try:
            import frida
            return True
        except ImportError:
            return False
    
    def _check_device_connection(self):
        """检查设备连接"""
        try:
            result = subprocess.run(['adb', 'devices'], capture_output=True, text=True, timeout=10)
            devices = [line for line in result.stdout.strip().split('\n')[1:] if line.strip()]
            return len(devices) > 0
        except:
            return False
    
    def initialize_project(self):
        """初始化项目"""
        print("开始初始化电商App爬虫项目...\n")
        
        # 1. 创建项目结构
        print("1. 创建项目目录结构...")
        self.create_project_structure()
        
        # 2. 创建配置文件
        print("\n2. 创建配置文件...")
        self.create_config_files()
        
        # 3. 创建依赖文件
        print("\n3. 创建依赖文件...")
        self.create_requirements_file()
        
        # 4. 创建Docker文件
        print("\n4. 创建Docker相关文件...")
        self.create_docker_files()
        
        # 5. 验证环境
        print("\n5. 验证开发环境...")
        if self.verify_environment():
            print("\n✓ 项目初始化完成!")
            return True
        else:
            print("\n✗ 环境验证失败,请检查配置")
            return False

# 配置管理器
class ConfigManager:
    """配置管理器"""
    
    def __init__(self, config_path: str = "config/main_config.json"):
        self.config_path = Path(config_path)
        self.config = self.load_config()
    
    def load_config(self):
        """加载配置"""
        if not self.config_path.exists():
            raise FileNotFoundError(f"配置文件不存在: {self.config_path}")
        
        with open(self.config_path, 'r', encoding='utf-8') as f:
            return json.load(f)
    
    def get(self, key_path: str, default=None):
        """获取配置值(支持路径访问)"""
        keys = key_path.split('.')
        value = self.config
        
        for key in keys:
            if isinstance(value, dict) and key in value:
                value = value[key]
            else:
                return default
        
        return value
    
    def set(self, key_path: str, value):
        """设置配置值"""
        keys = key_path.split('.')
        config = self.config
        
        for key in keys[:-1]:
            if key not in config:
                config[key] = {}
            config = config[key]
        
        config[keys[-1]] = value
    
    def save(self):
        """保存配置"""
        with open(self.config_path, 'w', encoding='utf-8') as f:
            json.dump(self.config, f, indent=2, ensure_ascii=False)
    
    def reload(self):
        """重新加载配置"""
        self.config = self.load_config()

def setup_development_environment():
    """设置开发环境"""
    env = DevelopmentEnvironment()
    
    if env.initialize_project():
        print("\n开发环境设置完成!")
        print("接下来的步骤:")
        print("1. 检查并修改 config/main_config.json 中的配置")
        print("2. 确保Android设备已连接并开启USB调试")
        print("3. 启动数据库服务")
        print("4. 运行爬虫服务")
    else:
        print("\n环境设置失败,请检查错误信息")

if __name__ == "__main__":
    setup_development_environment()

2.2 设备环境配置

import subprocess
import time
import os
from pathlib import Path
import json

class DeviceEnvironmentSetup:
    """设备环境配置类"""
    
    def __init__(self):
        self.required_apps = {
            'frida_server': {
                'download_url': 'https://github.com/frida/frida/releases',
                'installation_path': '/data/local/tmp/frida-server',
                'start_command': 'chmod 755 /data/local/tmp/frida-server && /data/local/tmp/frida-server &'
            },
            'busybox': {
                'download_url': 'https://github.com/MasterDevX/Binarys/raw/main/busybox',
                'installation_path': '/data/local/tmp/busybox',
                'start_command': 'chmod 755 /data/local/tmp/busybox'
            }
        }
    
    def check_adb_connection(self):
        """检查ADB连接"""
        try:
            result = subprocess.run(['adb', 'devices'], capture_output=True, text=True, timeout=10)
            devices = [line for line in result.stdout.strip().split('\n')[1:] if line.strip() and 'device' in line]
            print(f"找到 {len(devices)} 个连接的设备:")
            for device in devices:
                print(f"  {device}")
            return len(devices) > 0
        except Exception as e:
            print(f"ADB连接检查失败: {e}")
            return False
    
    def enable_developer_options(self):
        """启用开发者选项(需要手动操作提示)"""
        print("请在Android设备上手动完成以下设置:")
        print("1. 进入 设置 → 关于手机")
        print("2. 连续点击 '版本号' 7次以启用开发者选项")
        print("3. 返回设置,进入 开发者选项")
        print("4. 启用 'USB调试'")
        print("5. 启用 'USB安装'")
        print("6. 启用 'USB调试(安全设置)' (如果存在)")
        print("7. 在弹出的授权对话框中选择 '始终允许'")
    
    def check_root_access(self):
        """检查ROOT权限"""
        try:
            result = subprocess.run(['adb', 'shell', 'su', '-c', 'id'], capture_output=True, text=True, timeout=10)
            if 'uid=0' in result.stdout:
                print("✓ 设备已ROOT")
                return True
            else:
                print("⚠ 设备未ROOT,部分功能可能受限")
                return False
        except:
            print("⚠ ROOT权限检查失败")
            return False
    
    def install_frida_server(self):
        """安装Frida服务器"""
        print("安装Frida服务器...")
        
        # 检查是否已安装
        check_result = subprocess.run(['adb', 'shell', 'ls', '/data/local/tmp/frida-server'], 
                                    capture_output=True, text=True)
        if check_result.returncode == 0:
            print("Frida服务器已安装")
            return True
        
        # 下载Frida服务器(这里只是示例,实际需要根据架构下载对应版本)
        print("请手动下载对应架构的frida-server:")
        print("1. 访问 https://github.com/frida/frida/releases")
        print("2. 下载适合设备架构的frida-server")
        print("3. 重命名为frida-server")
        print("4. 执行: adb push frida-server /data/local/tmp/")
        print("5. 执行: adb shell chmod 755 /data/local/tmp/frida-server")
        
        # 提供自动安装脚本
        install_script = '''
#!/system/bin/sh
# 自动安装frida-server的shell脚本
FRIDA_VERSION="16.0.10"
ARCH=$(getprop ro.product.cpu.abi)

case $ARCH in
    "armeabi-v7a")
        URL="https://github.com/frida/frida/releases/download/$FRIDA_VERSION/frida-server-$FRIDA_VERSION-android-arm.xz"
        ;;
    "arm64-v8a")
        URL="https://github.com/frida/frida/releases/download/$FRIDA_VERSION/frida-server-$FRIDA_VERSION-android-arm64.xz"
        ;;
    "x86")
        URL="https://github.com/frida/frida/releases/download/$FRIDA_VERSION/frida-server-$FRIDA_VERSION-android-x86.xz"
        ;;
    "x86_64")
        URL="https://github.com/frida/frida/releases/download/$FRIDA_VERSION/frida-server-$FRIDA_VERSION-android-x86_64.xz"
        ;;
    *)
        echo "不支持的架构: $ARCH"
        exit 1
        ;;
esac

echo "正在下载 $URL"
cd /data/local/tmp
curl -L -o frida-server.xz "$URL"
xz -d frida-server.xz
chmod 755 frida-server
echo "Frida服务器安装完成"
'''
        
        script_path = Path("install_frida.sh")
        with open(script_path, 'w', encoding='utf-8') as f:
            f.write(install_script)
        
        print(f"自动安装脚本已生成: {script_path}")
        print("请将此脚本推送到设备并执行,或手动安装frida-server")
        
        return False
    
    def start_frida_server(self):
        """启动Frida服务器"""
        try:
            # 检查服务器是否已在运行
            ps_result = subprocess.run(['adb', 'shell', 'ps', '|', 'grep', 'frida-server'], 
                                     capture_output=True, text=True)
            
            if 'frida-server' not in ps_result.stdout:
                # 启动服务器
                subprocess.run(['adb', 'shell', 'chmod', '755', '/data/local/tmp/frida-server'], check=True)
                subprocess.run(['adb', 'shell', '/data/local/tmp/frida-server', '&'], check=True)
                time.sleep(2)  # 等待服务器启动
                
                # 验证服务器是否启动成功
                test_result = subprocess.run(['frida-ls-devices'], capture_output=True, text=True)
                if test_result.returncode == 0:
                    print("✓ Frida服务器启动成功")
                    return True
                else:
                    print("✗ Frida服务器启动失败")
                    return False
            else:
                print("Frida服务器已在运行")
                return True
        except Exception as e:
            print(f"启动Frida服务器失败: {e}")
            return False
    
    def setup_uiautomator2(self):
        """设置uiautomator2"""
        try:
            import uiautomator2 as u2
            print("✓ uiautomator2已安装")
            
            # 初始化设备
            print("初始化uiautomator2...")
            d = u2.connect()  # 连接默认设备
            
            # 检查设备状态
            if d.info:
                print(f"✓ 设备连接成功: {d.device_info}")
                print(f"  应用版本: {d.app_info('com.github.uiautomator')}")
                
                # 启动atx-agent
                d.healthcheck()
                
                return True
            else:
                print("✗ 设备连接失败")
                return False
                
        except ImportError:
            print("✗ uiautomator2未安装,请运行: pip install uiautomator2")
            return False
        except Exception as e:
            print(f"uiautomator2设置失败: {e}")
            return False
    
    def setup_target_app(self, package_name: str):
        """设置目标App"""
        print(f"设置目标App: {package_name}")
        
        # 检查App是否已安装
        result = subprocess.run(['adb', 'shell', 'pm', 'list', 'packages', package_name], 
                              capture_output=True, text=True)
        
        if package_name in result.stdout:
            print(f"✓ {package_name} 已安装")
        else:
            print(f"✗ {package_name} 未安装,请先安装目标App")
            return False
        
        # 获取App信息
        dumpsys_result = subprocess.run(['adb', 'shell', 'dumpsys', 'package', package_name], 
                                      capture_output=True, text=True)
        
        # 检查App是否支持调试
        if 'DEBUGGABLE' in dumpsys_result.stdout:
            print("✓ App支持调试模式")
        else:
            print("⚠ App不支持调试模式,可能需要额外处理")
        
        # 尝试启动App
        try:
            subprocess.run(['adb', 'shell', 'monkey', '-p', package_name, '-c', 
                          'android.intent.category.LAUNCHER', '1'], check=True)
            print(f"✓ {package_name} 启动成功")
            time.sleep(3)  # 等待App完全启动
            return True
        except:
            print(f"✗ {package_name} 启动失败")
            return False
    
    def optimize_device_settings(self):
        """优化设备设置"""
        optimizations = [
            ('关闭动画', 'settings put global window_animation_scale 0'),
            ('关闭过渡动画', 'settings put global transition_animation_scale 0'),
            ('关闭动画持续时间', 'settings put global animator_duration_scale 0'),
            ('关闭自动亮度', 'settings put system screen_brightness_mode 0'),
            ('设置最大亮度', 'settings put system screen_brightness 255'),
            ('关闭休眠', 'settings put system screen_off_timeout 2147483647'),  # 最大值
            ('关闭电池优化', 'cmd battery unplug')  # 断开电池优化
        ]
        
        print("优化设备设置...")
        for desc, cmd in optimizations:
            try:
                subprocess.run(['adb', 'shell', cmd], check=True)
                print(f"✓ {desc}")
            except:
                print(f"✗ {desc} (可能不支持)")
    
    def create_performance_profile(self):
        """创建性能配置文件"""
        profile_config = {
            "device_optimizations": {
                "disable_animations": True,
                "max_brightness": True,
                "prevent_sleep": True,
                "disable_battery_optimization": True
            },
            "app_settings": {
                "disable_update_prompts": True,
                "disable_permission_prompts": True,
                "enable_background_processing": True
            },
            "network_settings": {
                "keep_wifi_awake": True,
                "disable_power_save": True
            }
        }
        
        config_path = Path("device_performance_config.json")
        with open(config_path, 'w', encoding='utf-8') as f:
            json.dump(profile_config, f, indent=2, ensure_ascii=False)
        
        print(f"性能配置文件已创建: {config_path}")
    
    def setup_complete_environment(self, target_package: str):
        """设置完整环境"""
        print("=== 开始设置完整设备环境 ===\n")
        
        steps = [
            ("检查ADB连接", self.check_adb_connection),
            ("检查ROOT权限", self.check_root_access),
            ("安装Frida服务器", self.install_frida_server),
            ("启动Frida服务器", self.start_frida_server),
            ("设置uiautomator2", self.setup_uiautomator2),
            ("设置目标App", lambda: self.setup_target_app(target_package)),
            ("优化设备设置", self.optimize_device_settings),
            ("创建性能配置", self.create_performance_profile)
        ]
        
        results = {}
        for step_name, step_func in steps:
            print(f"\n{step_name}...")
            try:
                result = step_func()
                results[step_name] = result
                status = "✓" if result else "✗"
                print(f"{status} {step_name} {'成功' if result else '失败'}")
            except Exception as e:
                print(f"✗ {step_name} 执行异常: {e}")
                results[step_name] = False
        
        print(f"\n=== 环境设置完成 ===")
        successful_steps = sum(1 for result in results.values() if result)
        total_steps = len(results)
        print(f"成功率: {successful_steps}/{total_steps}")
        
        if successful_steps == total_steps:
            print("✓ 所有环境设置成功!")
            return True
        else:
            print("⚠ 部分设置失败,请检查错误并重试")
            return False

class DeviceManager:
    """设备管理器"""
    
    def __init__(self):
        self.devices = self._get_connected_devices()
    
    def _get_connected_devices(self):
        """获取连接的设备列表"""
        try:
            result = subprocess.run(['adb', 'devices'], capture_output=True, text=True, timeout=10)
            lines = result.stdout.strip().split('\n')[1:]  # 跳过第一行标题
            devices = []
            for line in lines:
                if line.strip() and 'device' in line and 'offline' not in line:
                    parts = line.strip().split('\t')
                    if len(parts) >= 2:
                        devices.append({
                            'serial': parts[0],
                            'status': parts[1]
                        })
            return devices
        except:
            return []
    
    def select_device(self):
        """选择设备"""
        if not self.devices:
            print("没有找到连接的设备")
            return None
        
        print("连接的设备:")
        for i, device in enumerate(self.devices):
            print(f"{i+1}. {device['serial']} ({device['status']})")
        
        if len(self.devices) == 1:
            selected = 0
        else:
            try:
                choice = int(input("请选择设备编号: ")) - 1
                if 0 <= choice < len(self.devices):
                    selected = choice
                else:
                    print("无效选择,使用第一个设备")
                    selected = 0
            except ValueError:
                print("输入无效,使用第一个设备")
                selected = 0
        
        selected_device = self.devices[selected]
        print(f"已选择设备: {selected_device['serial']}")
        
        # 设置环境变量
        os.environ['ANDROID_SERIAL'] = selected_device['serial']
        return selected_device['serial']
    
    def get_device_info(self, serial: str = None):
        """获取设备详细信息"""
        if serial:
            cmd_prefix = ['adb', '-s', serial]
        else:
            cmd_prefix = ['adb']
        
        try:
            # 设备基本信息
            model = subprocess.run(cmd_prefix + ['shell', 'getprop', 'ro.product.model'], 
                                 capture_output=True, text=True).stdout.strip()
            brand = subprocess.run(cmd_prefix + ['shell', 'getprop', 'ro.product.brand'], 
                                 capture_output=True, text=True).stdout.strip()
            android_version = subprocess.run(cmd_prefix + ['shell', 'getprop', 'ro.build.version.release'], 
                                           capture_output=True, text=True).stdout.strip()
            api_level = subprocess.run(cmd_prefix + ['shell', 'getprop', 'ro.build.version.sdk'], 
                                     capture_output=True, text=True).stdout.strip()
            
            return {
                'model': model,
                'brand': brand,
                'android_version': android_version,
                'api_level': api_level
            }
        except:
            return {}

def setup_device_environment():
    """设置设备环境"""
    # 设备选择
    device_manager = DeviceManager()
    selected_device = device_manager.select_device()
    
    if not selected_device:
        print("未选择设备,无法继续")
        return False
    
    # 获取设备信息
    device_info = device_manager.get_device_info(selected_device)
    print(f"设备信息: {device_info}")
    
    # 设置环境
    setup = DeviceEnvironmentSetup()
    target_package = input("请输入目标App包名 (如com.example.ecommerce): ").strip()
    
    if target_package:
        success = setup.setup_complete_environment(target_package)
        if success:
            print("\n设备环境设置完成!")
            print("现在可以开始爬虫开发了")
        else:
            print("\n设备环境设置失败,请检查错误信息")
        return success
    else:
        print("未提供目标App包名")
        return False

if __name__ == "__main__":
    setup_device_environment()

3. 反反爬虫策略实现

3.1 SSL Pinning绕过

import frida
import time
import json
from typing import Dict, List, Optional

class SSLPinningBypass:
    """SSL Pinning绕过类"""
    
    def __init__(self, device_id: str = None):
        self.device = None
        self.session = None
        self.script = None
        self.connect_device(device_id)
    
    def connect_device(self, device_id: str = None):
        """连接设备"""
        try:
            if device_id:
                self.device = frida.get_usb_device(2)
            else:
                self.device = frida.get_usb_device(2)
            print(f"✓ 连接到设备: {self.device.name}")
        except:
            try:
                self.device = frida.get_remote_device()
                print(f"✓ 连接到远程设备: {self.device.name}")
            except:
                print("✗ 无法连接到设备")
                raise
    
    def bypass_common_ssl_pinning(self, package_name: str):
        """绕过常见SSL Pinning方法"""
        # 附加到应用
        try:
            self.session = self.device.attach(package_name)
            print(f"✓ 附加到应用: {package_name}")
        except Exception as e:
            print(f"✗ 附加失败: {e}")
            return False
        
        # SSL Pinning绕过脚本
        ssl_bypass_script = """
        console.log("[.] SSL Pinning Bypass/Hooking started...");

        // 1. OkHttp (squareup) - 最常见的HTTP客户端
        var okhttp3Certificates = [
            'okhttp3.CertificatePinner', 
            'okhttp3.CertificatePinner$Builder',
            'okhttp3.Handshake'
        ];

        okhttp3Certificates.forEach(function(cert_class) {
            try {
                var certificatePinner = Java.use(cert_class);
                
                if (cert_class === 'okhttp3.CertificatePinner') {
                    // 绕过 check 方法
                    certificatePinner.check.overload('java.lang.String', 'java.util.List').implementation = function(hostname, peerCertificates) {
                        console.log('[+] OkHTTP 3.x CertificatePinner check called for: ' + hostname);
                        // 不调用原始方法 - 有效绕过检查
                    };
                    
                    // 绕过 check$okhttp 方法(某些版本)
                    if (certificatePinner['check$okhttp']) {
                        certificatePinner['check$okhttp'].implementation = function(realCall, hostname, peerCertificates) {
                            console.log('[+] OkHTTP 3.x CertificatePinner check$okhttp called');
                            // 绕过
                        };
                    }
                    
                    // 绕过 equals 方法(防止证书比较)
                    certificatePinner.equals.implementation = function(other) {
                        console.log('[+] OkHTTP 3.x CertificatePinner equals called');
                        return true; // 总是返回相等
                    };
                }
            } catch (err) {
                console.log('[-] OkHTTP 3.x pinner not found: ' + err);
            }
        });

        // 2. TrustManagerImpl (Android 7+) - Android系统级证书验证
        try {
            var TrustManagerImpl = Java.use('com.android.org.conscrypt.TrustManagerImpl');
            
            // Android 7+
            TrustManagerImpl.verifyChain.implementation = function(untrustedChain, trustAnchorChain, host, clientAuth, ocspData, tlsSctData) {
                console.log('[+] Bypassing TrustManagerImpl->verifyChain() for host: ' + host);
                return untrustedChain; // 返回证书作为可信的
            };
            
            // 绕过 checkTrustedRecursive 方法
            if (TrustManagerImpl['checkTrustedRecursive']) {
                TrustManagerImpl['checkTrustedRecursive'].implementation = function(chain, host, clientAuth, untrustedChain, trustAnchorChain) {
                    console.log('[+] Bypassing TrustManagerImpl->checkTrustedRecursive() for host: ' + host);
                    return [];
                };
            }
        } catch (err) {
            console.log('[-] TrustManagerImpl not found: ' + err);
        }

        // 3. Apache HTTP Client 绕过
        try {
            var AbstractVerifier = Java.use("ch.boye.httpclientandroidlib.conn.ssl.AbstractVerifier");
            AbstractVerifier.verify.overload('java.lang.String', 'javax.net.ssl.SSLSession').implementation = function(host, session) {
                console.log('[+] Bypassing AbstractVerifier->verify() for host: ' + host);
                // 不执行任何操作,允许连接
            };
            
            AbstractVerifier.verify.overload('java.lang.String', 'java.security.cert.X509Certificate').implementation = function(host, cert) {
                console.log('[+] Bypassing AbstractVerifier->verify() for host: ' + host);
                // 不执行任何操作,允许连接
            };
            
            AbstractVerifier.verify.overload('java.lang.String', 'java.lang.String', 'java.security.cert.X509Certificate').implementation = function(host, criteria, cert) {
                console.log('[+] Bypassing AbstractVerifier->verify() for host: ' + host);
                // 不执行任何操作,允许连接
            };
        } catch (err) {
            console.log('[-] Apache HTTP Client verifier not found: ' + err);
        }

        // 4. NetworkSecurityConfig (Android 7+) - 应用级网络安全配置
        try {
            var NetworkSecurityConfig = Java.use("android.security.net.config.NetworkSecurityConfig");
            
            if (NetworkSecurityConfig.isPinningEnforced) {
                NetworkSecurityConfig.isPinningEnforced.implementation = function() {
                    console.log('[+] Bypassing NetworkSecurityConfig->isPinningEnforced()');
                    return false; // 禁用证书固定
                };
            }
            
            if (NetworkSecurityConfig.getCertificateSource) {
                NetworkSecurityConfig.getCertificateSource.implementation = function() {
                    console.log('[+] Bypassing NetworkSecurityConfig->getCertificateSource()');
                    // 返回空的证书源
                    return null;
                };
            }
        } catch (err) {
            console.log('[-] NetworkSecurityConfig not found: ' + err);
        }

        // 5. X509TrustManager 绕过 - 通用信任管理器
        try {
            var X509TrustManager = Java.use('javax.net.ssl.X509TrustManager');
            var SSLContext = Java.use('javax.net.ssl.SSLContext');

            // 创建空的信任管理器
            var TrustAllCerts = Java.registerClass({
                name: 'org.wooyun.TrustAllCerts',
                implements: [X509TrustManager],
                methods: {
                    checkClientTrusted: function(chain, authType) {},
                    checkServerTrusted: function(chain, authType) {},
                    getAcceptedIssuers: function() { return []; }
                }
            });

            // 绕过SSLContext初始化
            SSLContext.init.overload('[Ljavax.net.ssl.KeyManager;', '[Ljavax.net.ssl.TrustManager;', 'java.security.SecureRandom').implementation = function(keyManager, trustManager, secureRandom) {
                console.log('[+] Bypassing SSLContext.init() with custom trust manager');
                var ModifiedTrustManager = Java.cast(TrustAllCerts.$new(), X509TrustManager);
                this.init(keyManager, [ModifiedTrustManager], secureRandom);
            };
        } catch (err) {
            console.log('[-] X509TrustManager bypass failed: ' + err);
        }

        // 6. HttpURLConnection 绕过
        try {
            var HttpsURLConnection = Java.use('javax.net.ssl.HttpsURLConnection');
            
            if (HttpsURLConnection.setSSLSocketFactory) {
                HttpsURLConnection.setSSLSocketFactory.implementation = function(sslSocketFactory) {
                    console.log('[+] Bypassing HttpsURLConnection->setSSLSocketFactory()');
                    // 使用空的SocketFactory或自定义的允许所有证书的Factory
                };
            }
            
            if (HttpsURLConnection.setHostnameVerifier) {
                HttpsURLConnection.setHostnameVerifier.implementation = function(verifier) {
                    console.log('[+] Bypassing HttpsURLConnection->setHostnameVerifier()');
                    // 不设置主机名验证器,或者设置允许所有主机的验证器
                };
            }
        } catch (err) {
            console.log('[-] HttpsURLConnection bypass failed: ' + err);
        }

        // 7. WebView SSL 绕过
        try {
            var WebViewClient = Java.use('android.webkit.WebViewClient');
            
            WebViewClient.onReceivedSslError.implementation = function(view, handler, error) {
                console.log('[+] Bypassing WebViewClient->onReceivedSslError() for URL: ' + error.getUrl());
                handler.proceed(); // 忽略SSL错误,继续加载
            };
            
            if (WebViewClient.onReceivedError) {
                WebViewClient.onReceivedError.overload('android.webkit.WebView', 'int', 'java.lang.String', 'java.lang.String').implementation = function(view, errorCode, description, failingUrl) {
                    console.log('[+] WebViewClient->onReceivedError() ignored for: ' + failingUrl);
                    // 不调用原始方法,忽略错误
                };
            }
        } catch (err) {
            console.log('[-] WebView SSL bypass failed: ' + err);
        }

        // 8. Apache HttpClient 4.x 绕过
        try {
            var SSLSocketFactory = Java.use('org.apache.http.conn.ssl.SSLSocketFactory');
            var ALLOW_ALL_HOSTNAME_VERIFIER = SSLSocketFactory['ALLOW_ALL_HOSTNAME_VERIFIER'].value;
            
            SSLSocketFactory.setHostnameVerifier.implementation = function(hostnameVerifier) {
                console.log('[+] Bypassing SSLSocketFactory->setHostnameVerifier()');
                // 设置允许所有主机名的验证器
                this.setHostnameVerifier(ALLOW_ALL_HOSTNAME_VERIFIER);
            };
        } catch (err) {
            console.log('[-] Apache HttpClient 4.x bypass failed: ' + err);
        }

        // 9. 自定义证书固定实现绕过
        // 通常出现在应用自己的类中
        try {
            // 这里可以根据具体应用定制绕过
            // 例如,如果知道应用使用了自定义的证书固定类
            // 可以 Hook 相应的验证方法
        } catch (err) {
            console.log('[-] Custom certificate pinning bypass failed: ' + err);
        }

        console.log("[+] SSL Pinning Bypass hooks installed successfully.");
        """
        
        try:
            self.script = self.session.create_script(ssl_bypass_script)
            self.script.load()
            print("✓ SSL Pinning绕过脚本加载成功")
            return True
        except Exception as e:
            print(f"✗ SSL Pinning绕过脚本加载失败: {e}")
            return False
    
    def bypass_dynamic_ssl_pinning(self, package_name: str):
        """动态绕过SSL Pinning"""
        # 附加到应用
        try:
            self.session = self.device.attach(package_name)
            print(f"✓ 动态附加到应用: {package_name}")
        except Exception as e:
            print(f"✗ 动态附加失败: {e}")
            return False
        
        # 动态检测和绕过脚本
        dynamic_bypass_script = """
        console.log("[.] Dynamic SSL Pinning Detection and Bypass started...");

        // 动态类加载监控
        var loadedClasses = {};

        // 监控类加载
        Java.enumerateLoadedClasses({
            onMatch: function(className) {
                loadedClasses[className] = true;
                
                // 检查是否是证书固定相关的类
                if (className.toLowerCase().indexOf('pinning') !== -1 || 
                    className.toLowerCase().indexOf('certificate') !== -1 ||
                    className.toLowerCase().indexOf('trust') !== -1 ||
                    className.toLowerCase().indexOf('ssl') !== -1) {
                    
                    console.log('[+] Potential SSL-related class detected: ' + className);
                    
                    try {
                        var targetClass = Java.use(className);
                        
                        // 尝试Hook常见方法
                        var methods = targetClass.class.getDeclaredMethods();
                        for (var i = 0; i < methods.length; i++) {
                            var methodName = methods[i].getName();
                            
                            if (methodName.toLowerCase().indexOf('verify') !== -1 ||
                                methodName.toLowerCase().indexOf('check') !== -1 ||
                                methodName.toLowerCase().indexOf('validate') !== -1 ||
                                methodName.toLowerCase().indexOf('pin') !== -1) {
                                
                                console.log('[+] Hooking potential SSL verification method: ' + className + '.' + methodName);
                                
                                // 尝试Hook这个方法(需要根据具体参数类型)
                                // 这里使用通用的Hook方式
                                try {
                                    var method = targetClass[methodName];
                                    if (method && method.implementation) {
                                        method.implementation = function() {
                                            console.log('[+] Bypassed ' + className + '.' + methodName + ' called');
                                            // 根据返回类型返回适当的值
                                            var result = this[methodName].apply(this, arguments);
                                            console.log('[+] Method returned: ' + result);
                                            return result;
                                        };
                                    }
                                } catch (hookErr) {
                                    console.log('[-] Failed to hook ' + className + '.' + methodName + ': ' + hookErr);
                                }
                            }
                        }
                    } catch (classErr) {
                        console.log('[-] Failed to process class ' + className + ': ' + classErr);
                    }
                }
            },
            onComplete: function() {
                console.log('[+] Finished enumerating classes, found ' + Object.keys(loadedClasses).length + ' classes');
            }
        });

        // 监控网络连接
        var URLConnection = Java.use('java.net.URLConnection');
        var HttpsURLConnection = Java.use('javax.net.ssl.HttpsURLConnection');
        var Socket = Java.use('java.net.Socket');
        var SSLSocket = Java.use('javax.net.ssl.SSLSocket');

        // Hook HTTPS连接
        if (HttpsURLConnection.connect) {
            HttpsURLConnection.connect.implementation = function() {
                console.log('[+] HttpsURLConnection.connect() called to: ' + this.getURL());
                try {
                    this.connect.apply(this, arguments);
                } catch (e) {
                    console.log('[-] Connect failed (may be SSL issue): ' + e);
                    // 继续执行
                }
            };
        }

        // Hook SSL Socket连接
        if (SSLSocket.startHandshake) {
            SSLSocket.startHandshake.implementation = function() {
                console.log('[+] SSLSocket.startHandshake() called');
                try {
                    this.startHandshake.apply(this, arguments);
                } catch (e) {
                    console.log('[-] SSL Handshake failed: ' + e);
                    // 继续执行
                }
            };
        }

        // 实时监控新加载的类
        var classLoadMonitor = Java.registerClass({
            name: 'SSLBypass.ClassLoadMonitor',
            superClass: Java.use('java.lang.Object'),
            methods: {
                monitorClassLoad: function(className) {
                    if (loadedClasses[className] === undefined) {
                        console.log('[+] New class loaded: ' + className);
                        loadedClasses[className] = true;
                        
                        // 检查新加载的类是否需要绕过
                        if (className.toLowerCase().indexOf('pinning') !== -1 ||
                            className.toLowerCase().indexOf('certificate') !== -1) {
                            console.log('[+] New SSL-related class detected: ' + className);
                            // 可以在这里添加特定的绕过逻辑
                        }
                    }
                }
            }
        });

        console.log("[+] Dynamic SSL Pinning Detection and Bypass hooks installed.");
        """
        
        try:
            self.script = self.session.create_script(dynamic_bypass_script)
            self.script.load()
            print("✓ 动态SSL Pinning绕过脚本加载成功")
            return True
        except Exception as e:
            print(f"✗ 动态SSL Pinning绕过脚本加载失败: {e}")
            return False
    
    def test_ssl_bypass(self):
        """测试SSL绕过是否生效"""
        test_script = """
        Java.perform(function() {
            try {
                var URL = Java.use('java.net.URL');
                var connection = Java.use('java.net.HttpURLConnection');
                
                console.log('[+] SSL Bypass Test Started');
                
                // 尝试创建HTTPS连接来测试绕过效果
                var testUrl = URL.$new('https://self-signed.badssl.com/');
                var conn = testUrl.openConnection();
                
                if (conn.getClass().getSimpleName().indexOf('Https') !== -1) {
                    console.log('[+] Successfully created HTTPS connection (bypass likely working)');
                } else {
                    console.log('[+] Created connection but not HTTPS (bypass may not be complete)');
                }
                
                console.log('[+] SSL Bypass Test Completed');
                
            } catch (e) {
                console.log('[-] SSL Bypass Test Error: ' + e);
            }
        });
        """
        
        try:
            test_script_obj = self.session.create_script(test_script)
            test_script_obj.load()
            time.sleep(2)  # 等待测试执行
            test_script_obj.unload()
            return True
        except Exception as e:
            print(f"✗ SSL绕过测试失败: {e}")
            return False

class AdvancedSSLBypass(SSLPinningBypass):
    """高级SSL绕过类"""
    
    def __init__(self, device_id: str = None):
        super().__init__(device_id)
    
    def bypass_with_certificate_spoofing(self, package_name: str):
        """使用证书欺骗绕过"""
        try:
            self.session = self.device.attach(package_name)
            print(f"✓ 附加到应用: {package_name}")
        except Exception as e:
            print(f"✗ 附加失败: {e}")
            return False
        
        # 证书欺骗绕过脚本
        spoof_bypass_script = """
        console.log("[.] Certificate Spoofing SSL Bypass started...");

        // 创建一个虚假的有效证书
        var FakeCertificate = Java.registerClass({
            name: 'com.example.FakeCertificate',
            superClass: Java.use('java.security.cert.X509Certificate'),
            methods: {
                getSubjectDN: function() {
                    return Java.use('javax.security.auth.x500.X500Principal').$new('CN=*.google.com');
                },
                getIssuerDN: function() {
                    return Java.use('javax.security.auth.x500.X500Principal').$new('CN=Fake CA');
                },
                getSerialNumber: function() {
                    return Java.use('java.math.BigInteger').$new('123456789');
                },
                getBasicConstraints: function() {
                    return -1;
                },
                getNotAfter: function() {
                    return Java.use('java.util.Date').$new(new Date().getTime() + 365 * 24 * 60 * 60 * 1000);
                },
                getNotBefore: function() {
                    return Java.use('java.util.Date').$new(new Date().getTime() - 24 * 60 * 60 * 1000);
                },
                checkValidity: function() {
                    // 总是有效
                },
                checkValidity.overload('java.util.Date'): function(date) {
                    return true;
                },
                getPublicKey: function() {
                    // 返回一个假的公钥
                    return null;
                },
                verify: function(publicKey) {
                    return true;
                },
                verify.overload('java.security.PublicKey', 'java.lang.String'): function(publicKey, sigProvider) {
                    return true;
                },
                getSigAlgName: function() {
                    return 'SHA256withRSA';
                },
                getSignature: function() {
                    return [1, 2, 3, 4]; // 假签名
                },
                getTBSCertificate: function() {
                    return [1, 2, 3, 4];
                },
                getEncoded: function() {
                    return [1, 2, 3, 4];
                },
                toString: function() {
                    return 'Fake Valid Certificate';
                }
            }
        });

        // 绕过证书验证
        var X509TrustManager = Java.use('javax.net.ssl.X509TrustManager');
        var SSLContext = Java.use('javax.net.ssl.SSLContext');

        var FakeTrustManager = Java.registerClass({
            name: 'com.example.FakeTrustManager',
            implements: [X509TrustManager],
            methods: {
                checkClientTrusted: function(chain, authType) {
                    console.log('[+] FakeTrustManager.checkClientTrusted called');
                },
                checkServerTrusted: function(chain, authType) {
                    console.log('[+] FakeTrustManager.checkServerTrusted called, returning fake certificate');
                    // 返回虚假证书而不是实际的证书链
                    var fakeCert = FakeCertificate.$new();
                    return [fakeCert];
                },
                getAcceptedIssuers: function() {
                    console.log('[+] FakeTrustManager.getAcceptedIssuers called');
                    return [];
                }
            }
        });

        // Hook SSLContext初始化
        SSLContext.init.overload('[Ljavax.net.ssl.KeyManager;', '[Ljavax.net.ssl.TrustManager;', 'java.security.SecureRandom').implementation = function(keyManager, trustManager, secureRandom) {
            console.log('[+] SSLContext.init() called, replacing trust managers');
            
            // 使用我们的虚假信任管理器
            var fakeTrustManager = FakeTrustManager.$new();
            this.init(keyManager, [fakeTrustManager], secureRandom);
        };

        // 绕过 OkHttp 的证书固定
        try {
            var CertificatePinner = Java.use('okhttp3.CertificatePinner');
            CertificatePinner.check.overload('java.lang.String', 'java.util.List').implementation = function(hostname, peerCertificates) {
                console.log('[+] OkHttp CertificatePinner.check called for: ' + hostname);
                
                // 创建一个虚假的证书列表
                var ArrayList = Java.use('java.util.ArrayList');
                var fakeList = ArrayList.$new();
                var fakeCert = FakeCertificate.$new();
                fakeList.add(fakeCert);
                
                console.log('[+] Certificate pinning bypassed with fake certificate');
            };
        } catch (e) {
            console.log('[-] OkHttp CertificatePinner hook failed: ' + e);
        }

        // 绕过 Android 系统的证书验证
        try {
            var TrustManagerImpl = Java.use('com.android.org.conscrypt.TrustManagerImpl');
            
            TrustManagerImpl.verifyChain.implementation = function(untrustedChain, trustAnchorChain, host, clientAuth, ocspData, tlsSctData) {
                console.log('[+] TrustManagerImpl.verifyChain called for: ' + host);
                
                // 返回虚假的受信任证书链
                var ArrayList = Java.use('java.util.ArrayList');
                var trustedChain = ArrayList.$new();
                var fakeCert = FakeCertificate.$new();
                trustedChain.add(fakeCert);
                
                console.log('[+] Certificate chain validation bypassed');
                return trustedChain;
            };
        } catch (e) {
            console.log('[-] TrustManagerImpl hook failed: ' + e);
        }

        console.log("[+] Certificate Spoofing SSL Bypass hooks installed.");
        """
        
        try:
            self.script = self.session.create_script(spoof_bypass_script)
            self.script.load()
            print("✓ 证书欺骗SSL绕过脚本加载成功")
            return True
        except Exception as e:
            print(f"✗ 证书欺骗SSL绕过脚本加载失败: {e}")
            return False

def ssl_bypass_main():
    """SSL绕过主函数"""
    print("=== SSL Pinning 绕过工具 ===\n")
    
    package_name = input("请输入目标App包名: ").strip()
    if not package_name:
        print("未提供包名")
        return
    
    print("\n选择绕过方法:")
    print("1. 标准SSL Pinning绕过")
    print("2. 动态SSL Pinning绕过") 
    print("3. 证书欺骗绕过")
    print("4. 全部方法")
    
    choice = input("请选择 (1-4): ").strip()
    
    bypass = SSLPinningBypass()
    
    success = False
    if choice == '1':
        success = bypass.bypass_common_ssl_pinning(package_name)
    elif choice == '2':
        success = bypass.bypass_dynamic_ssl_pinning(package_name)
    elif choice == '3':
        advanced_bypass = AdvancedSSLBypass()
        success = advanced_bypass.bypass_with_certificate_spoofing(package_name)
    elif choice == '4':
        print("\n执行标准绕过...")
        success1 = bypass.bypass_common_ssl_pinning(package_name)
        
        print("\n执行动态绕过...")
        success2 = bypass.bypass_dynamic_ssl_pinning(package_name)
        
        print("\n执行证书欺骗绕过...")
        advanced_bypass = AdvancedSSLBypass()
        success3 = advanced_bypass.bypass_with_certificate_spoofing(package_name)
        
        success = success1 or success2 or success3
    else:
        print("无效选择")
        return
    
    if success:
        print("\n✓ SSL Pinning绕过设置完成")
        print("可以开始进行HTTPS流量抓取")
        
        # 测试绕过效果
        print("\n测试SSL绕过效果...")
        bypass.test_ssl_bypass()
    else:
        print("\n✗ SSL Pinning绕过设置失败")

if __name__ == "__main__":
    ssl_bypass_main()

3.2 请求签名绕过

import hashlib
import hmac
import time
import random
import string
import json
from urllib.parse import urlencode, urlparse, parse_qs
from typing import Dict, List, Any, Optional
import frida

class RequestSignatureBypass:
    """请求签名绕过类"""
    
    def __init__(self):
        self.signature_algorithms = {}
        self.parameters = {}
        self.headers = {}
    
    def analyze_signature_patterns(self, traffic_data: List[Dict]) -> Dict:
        """分析签名模式"""
        analysis = {
            'common_params': [],
            'signature_methods': [],
            'header_patterns': [],
            'url_patterns': [],
            'suspected_algorithms': []
        }
        
        for request in traffic_data:
            # 分析URL参数
            if 'url' in request:
                parsed = urlparse(request['url'])
                params = parse_qs(parsed.query)
                
                for param_name in params.keys():
                    if any(keyword in param_name.lower() for keyword in 
                          ['sign', 'signature', 'token', 'auth', 'key', 'checksum', 'digest']):
                        analysis['common_params'].append(param_name)
            
            # 分析请求头
            if 'headers' in request:
                for header_name, header_value in request['headers'].items():
                    if any(keyword in header_name.lower() for keyword in 
                          ['sign', 'authorization', 'auth', 'token', 'checksum']):
                        analysis['header_patterns'].append(header_name)
            
            # 分析请求体
            if 'body' in request and isinstance(request['body'], str):
                try:
                    body_json = json.loads(request['body'])
                    for key in body_json.keys():
                        if any(keyword in key.lower() for keyword in 
                              ['sign', 'signature', 'token', 'auth', 'key']):
                            analysis['common_params'].append(key)
                except:
                    pass
        
        # 统计最常见的参数
        from collections import Counter
        param_counter = Counter(analysis['common_params'])
        analysis['most_common_params'] = param_counter.most_common(10)
        
        return analysis
    
    def identify_signature_algorithm(self, param_name: str, param_value: str) -> str:
        """识别签名算法"""
        # 根据参数值的特征识别算法
        value = str(param_value)
        
        # MD5特征:32位十六进制字符串
        if len(value) == 32 and all(c in '0123456789abcdefABCDEF' for c in value):
            return 'MD5'
        
        # SHA1特征:40位十六进制字符串
        if len(value) == 40 and all(c in '0123456789abcdefABCDEF' for c in value):
            return 'SHA1'
        
        # SHA256特征:64位十六进制字符串
        if len(value) == 64 and all(c in '0123456789abcdefABCDEF' for c in value):
            return 'SHA256'
        
        # Base64特征:包含A-Z, a-z, 0-9, +, /, =
        if all(c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=' for c in value):
            # 检查是否是Base64编码的哈希值
            if len(value) % 4 == 0:  # Base64长度是4的倍数
                import base64
                try:
                    decoded = base64.b64decode(value)
                    if len(decoded) == 32:  # 可能是SHA256
                        return 'SHA256_BASE64'
                    elif len(decoded) == 20:  # 可能是SHA1
                        return 'SHA1_BASE64'
                    elif len(decoded) == 16:  # 可能是MD5
                        return 'MD5_BASE64'
                except:
                    pass
        
        # 检查是否包含特定算法标识
        lower_value = value.lower()
        if 'sha' in lower_value or 'sha' in param_name.lower():
            return 'SHA_FAMILY'
        elif 'md' in lower_value or 'md' in param_name.lower():
            return 'MD_FAMILY'
        elif 'hmac' in lower_value or 'hmac' in param_name.lower():
            return 'HMAC_FAMILY'
        
        return 'UNKNOWN'
    
    def generate_signature(self, algorithm: str, data: str, secret: str = None) -> str:
        """生成签名"""
        if algorithm == 'MD5':
            return hashlib.md5(data.encode()).hexdigest()
        elif algorithm == 'SHA1':
            return hashlib.sha1(data.encode()).hexdigest()
        elif algorithm == 'SHA256':
            return hashlib.sha256(data.encode()).hexdigest()
        elif algorithm == 'HMAC_SHA256' and secret:
            return hmac.new(secret.encode(), data.encode(), hashlib.sha256).hexdigest()
        elif algorithm == 'HMAC_MD5' and secret:
            return hmac.new(secret.encode(), data.encode(), hashlib.md5).hexdigest()
        else:
            # 尝试通用方法
            if secret:
                return hmac.new(secret.encode(), data.encode(), hashlib.sha256).hexdigest()
            else:
                return hashlib.md5(data.encode()).hexdigest()
    
    def reconstruct_signature_logic(self, requests: List[Dict]) -> Dict:
        """重构签名逻辑"""
        # 收集所有带签名的请求
        signed_requests = []
        for req in requests:
            if any('sign' in k.lower() or 'signature' in k.lower() for k in req.get('params', {}).keys()) or \
               any('sign' in k.lower() or 'signature' in k.lower() for k in req.get('headers', {}).keys()):
                signed_requests.append(req)
        
        if not signed_requests:
            return {'error': 'No signed requests found'}
        
        # 分析第一个签名请求的结构
        first_signed = signed_requests[0]
        
        # 尝试找出签名生成逻辑
        signature_info = {
            'algorithm': 'UNKNOWN',
            'signed_params': [],
            'signature_param': None,
            'secret_location': 'UNKNOWN',
            'format': 'UNKNOWN'
        }
        
        # 查找签名参数
        for param_name, param_value in first_signed.get('params', {}).items():
            if 'sign' in param_name.lower() or 'signature' in param_name.lower():
                signature_info['signature_param'] = param_name
                signature_info['algorithm'] = self.identify_signature_algorithm(param_name, param_value)
                break
        
        # 分析参与签名的参数
        all_params = first_signed.get('params', {})
        if signature_info['signature_param']:
            # 移除签名参数本身
            params_for_signing = {k: v for k, v in all_params.items() 
                                if k != signature_info['signature_param']}
            signature_info['signed_params'] = list(params_for_signing.keys())
        
        return signature_info

class AdvancedSignatureBypass(RequestSignatureBypass):
    """高级签名绕过类"""
    
    def __init__(self):
        super().__init__()
        self.known_signatures = {}  # 缓存已知签名
        self.signature_templates = {}  # 签名模板
    
    def hook_signature_generation(self, package_name: str, device_id: str = None):
        """Hook签名生成函数"""
        try:
            if device_id:
                device = frida.get_usb_device(2)
            else:
                device = frida.get_usb_device(2)
            
            session = device.attach(package_name)
            
            # 签名生成Hook脚本
            signature_hook_script = """
            console.log("[.] Signature Generation Hook started...");
            
            // Hook常见的加密和哈希函数
            var CryptoUtils = {
                // Hook MessageDigest.getInstance
                hookMessageDigest: function() {
                    try {
                        var MessageDigest = Java.use('java.security.MessageDigest');
                        
                        MessageDigest.getInstance.overload('java.lang.String').implementation = function(algorithm) {
                            console.log('[+] MessageDigest.getInstance called with algorithm: ' + algorithm);
                            
                            var instance = this.getInstance.apply(this, arguments);
                            
                            // Hook digest方法
                            var originalDigest = instance.digest.overload();
                            instance.digest.overload().implementation = function() {
                                var result = originalDigest.apply(this, arguments);
                                console.log('[+] Digest calculated, result length: ' + result.length);
                                
                                // 转换为十六进制
                                var hex = '';
                                for (var i = 0; i < result.length; i++) {
                                    hex += ('0' + result[i].toString(16)).slice(-2);
                                }
                                console.log('[+] Digest result (hex): ' + hex);
                                
                                return result;
                            };
                            
                            return instance;
                        };
                    } catch (e) {
                        console.log('[-] MessageDigest hook failed: ' + e);
                    }
                },
                
                // Hook Mac.getInstance (HMAC)
                hookMac: function() {
                    try {
                        var Mac = Java.use('javax.crypto.Mac');
                        
                        Mac.getInstance.overload('java.lang.String').implementation = function(algorithm) {
                            console.log('[+] Mac.getInstance called with algorithm: ' + algorithm);
                            
                            var instance = this.getInstance.apply(this, arguments);
                            
                            // Hook init方法
                            var originalInit = instance.init.overload('java.security.Key');
                            instance.init.overload('java.security.Key').implementation = function(key) {
                                console.log('[+] Mac.init called with key');
                                try {
                                    var keyString = key.getEncoded();
                                    var keyHex = '';
                                    for (var i = 0; i < keyString.length; i++) {
                                        keyHex += ('0' + keyString[i].toString(16)).slice(-2);
                                    }
                                    console.log('[+] HMAC Key (hex): ' + keyHex);
                                } catch (e) {
                                    console.log('[-] Could not extract key: ' + e);
                                }
                                
                                return originalInit.apply(this, arguments);
                            };
                            
                            // Hook doFinal方法
                            var originalDoFinal = instance.doFinal.overload();
                            instance.doFinal.overload().implementation = function() {
                                console.log('[+] Mac.doFinal called');
                                var result = originalDoFinal.apply(this, arguments);
                                
                                // 转换为十六进制
                                var hex = '';
                                for (var i = 0; i < result.length; i++) {
                                    hex += ('0' + result[i].toString(16)).slice(-2);
                                }
                                console.log('[+] HMAC result (hex): ' + hex);
                                
                                return result;
                            };
                            
                            return instance;
                        };
                    } catch (e) {
                        console.log('[-] Mac hook failed: ' + e);
                    }
                },
                
                // Hook自定义签名方法
                hookCustomSignMethods: function() {
                    // 这里可以根据具体应用Hook自定义的签名方法
                    // 例如:com.example.utils.SignUtil.sign()
                    try {
                        var signClasses = [
                            'com.example.utils.SignUtil',
                            'com.example.security.SecurityUtil',
                            'com.example.api.ApiHelper'
                        ];
                        
                        signClasses.forEach(function(className) {
                            try {
                                var SignClass = Java.use(className);
                                
                                // 查找可能的签名方法
                                for (var methodName in SignClass) {
                                    if (typeof SignClass[methodName] === 'function' && 
                                        (methodName.toLowerCase().includes('sign') || 
                                         methodName.toLowerCase().includes('hash') ||
                                         methodName.toLowerCase().includes('auth'))) {
                                        
                                        console.log('[+] Hooking custom sign method: ' + className + '.' + methodName);
                                        
                                        var method = SignClass[methodName];
                                        method.implementation = function() {
                                            console.log('[+] Custom sign method called: ' + className + '.' + methodName);
                                            
                                            // 打印参数
                                            for (var i = 0; i < arguments.length; i++) {
                                                try {
                                                    console.log('[+] Arg ' + i + ': ' + arguments[i]);
                                                } catch (e) {
                                                    console.log('[+] Arg ' + i + ': [Could not convert]');
                                                }
                                            }
                                            
                                            var result = this[methodName].apply(this, arguments);
                                            
                                            try {
                                                console.log('[+] Custom sign result: ' + result);
                                            } catch (e) {
                                                console.log('[+] Custom sign result: [Could not convert]');
                                            }
                                            
                                            return result;
                                        };
                                    }
                                }
                            } catch (e) {
                                // 类不存在,跳过
                            }
                        });
                    } catch (e) {
                        console.log('[-] Custom sign methods hook failed: ' + e);
                    }
                }
            };
            
            // 执行所有Hook
            CryptoUtils.hookMessageDigest();
            CryptoUtils.hookMac();
            CryptoUtils.hookCustomSignMethods();
            
            console.log("[+] Signature generation hooks installed successfully.");
            """
            
            script = session.create_script(signature_hook_script)
            script.load()
            
            print("✓ 签名生成Hook脚本加载成功")
            print("监听签名生成过程,注意查看Frida控制台输出...")
            
            # 保持连接
            input("按Enter键停止监听...")
            script.unload()
            session.detach()
            
        except Exception as e:
            print(f"✗ 签名Hook失败: {e}")
    
    def brute_force_signature(self, base_params: Dict, target_signature: str) -> Dict:
        """暴力破解签名(谨慎使用)"""
        import itertools
        
        print("警告:暴力破解签名可能违反法律,请确保您有权访问目标系统")
        
        # 尝试不同的参数组合和排序
        param_combinations = []
        param_keys = list(base_params.keys())
        
        # 生成不同排序的参数
        for perm in itertools.permutations(param_keys):
            ordered_params = {k: base_params[k] for k in perm}
            param_combinations.append(ordered_params)
        
        # 尝试不同的签名算法
        algorithms = ['MD5', 'SHA1', 'SHA256']
        separators = ['', '&', '|', '#', '_', '-', '+']
        
        for params in param_combinations:
            for algo in algorithms:
                for sep in separators:
                    try:
                        # 构建待签名字符串
                        sign_string = sep.join(f"{k}={v}" for k, v in params.items())
                        
                        # 计算签名
                        calculated_sig = self.generate_signature(algo, sign_string)
                        
                        if calculated_sig.lower() == target_signature.lower():
                            return {
                                'found': True,
                                'algorithm': algo,
                                'separator': sep,
                                'params_order': list(params.keys()),
                                'sign_string': sign_string
                            }
                    except:
                        continue
        
        return {'found': False}
    
    def replay_attack_simulation(self, original_request: Dict, new_params: Dict) -> Dict:
        """重放攻击模拟(用于测试)"""
        # 复制原始请求
        modified_request = original_request.copy()
        
        # 更新参数
        if 'params' in modified_request:
            modified_request['params'].update(new_params)
        else:
            modified_request['params'] = new_params
        
        # 尝试移除原有的签名
        if 'params' in modified_request:
            for key in list(modified_request['params'].keys()):
                if 'sign' in key.lower() or 'signature' in key.lower():
                    del modified_request['params'][key]
        
        # 生成新的请求(需要知道签名逻辑)
        return modified_request

def signature_bypass_main():
    """签名绕过主函数"""
    print("=== 请求签名绕过工具 ===\n")
    
    bypass = AdvancedSignatureBypass()
    
    print("选择操作:")
    print("1. 分析签名模式")
    print("2. Hook签名生成")
    print("3. 暴力破解签名(谨慎使用)")
    print("4. 签名逻辑重构")
    
    choice = input("请选择 (1-4): ").strip()
    
    if choice == '1':
        # 模拟流量数据
        mock_traffic = [
            {
                'url': 'https://api.example.com/product/list?category=electronics&timestamp=1234567890&sign=abc123def456',
                'headers': {'Authorization': 'Bearer token123'},
                'params': {'category': 'electronics', 'timestamp': '1234567890', 'sign': 'abc123def456'}
            }
        ]
        analysis = bypass.analyze_signature_patterns(mock_traffic)
        print(f"签名分析结果: {analysis}")
    
    elif choice == '2':
        package_name = input("请输入目标App包名: ").strip()
        bypass.hook_signature_generation(package_name)
    
    elif choice == '3':
        print("注意:暴力破解仅用于授权测试!")
        base_params = {'param1': 'value1', 'param2': 'value2'}
        target_sig = input("请输入目标签名: ").strip()
        result = bypass.brute_force_signature(base_params, target_sig)
        print(f"暴力破解结果: {result}")
    
    elif choice == '4':
        mock_requests = [
            {
                'params': {
                    'appid': '12345',
                    'timestamp': '1234567890', 
                    'nonce': 'abc123',
                    'data': 'somedata',
                    'sign': 'generated_signature_here'
                }
            }
        ]
        logic = bypass.reconstruct_signature_logic(mock_requests)
        print(f"签名逻辑重构结果: {logic}")
    
    else:
        print("无效选择")

if __name__ == "__main__":
    signature_bypass_main()

### 3.3 验证码处理与频率限制应对

```python
import time
import random
from typing import Dict, List, Optional
import cv2
import numpy as np
from PIL import Image
import pytesseract
import requests

class CaptchaHandler:
    """验证码处理器"""
    
    def __init__(self):
        self.captcha_models = {}
        self.rate_limiter = RateLimiter()
    
    def detect_captcha_type(self, captcha_image: Image.Image) -> str:
        """检测验证码类型"""
        # 分析验证码特征
        width, height = captcha_image.size
        pixels = list(captcha_image.getdata())
        
        # 检测颜色复杂度
        unique_colors = len(set(pixels))
        color_density = unique_colors / (width * height)
        
        # 检测噪点
        noise_ratio = self._calculate_noise(pixels, width, height)
        
        # 检测形状复杂度
        shape_complexity = self._detect_shapes(captcha_image)
        
        # 判断验证码类型
        if color_density < 0.1 and noise_ratio < 0.1:
            return 'simple_text'  # 简单文字验证码
        elif shape_complexity > 0.3:
            return 'slider'  # 滑块验证码
        elif noise_ratio > 0.3 and unique_colors > 100:
            return 'complex_image'  # 复杂图像验证码
        elif color_density > 0.5:
            return 'color_based'  # 基于颜色的验证码
        else:
            return 'unknown'
    
    def _calculate_noise(self, pixels, width, height):
        """计算噪点比例"""
        # 简单的噪点检测算法
        noise_count = 0
        for i in range(1, height-1):
            for j in range(1, width-1):
                center = pixels[i * width + j]
                neighbors = [
                    pixels[(i-1) * width + (j-1)],
                    pixels[(i-1) * width + j],
                    pixels[(i-1) * width + (j+1)],
                    pixels[i * width + (j-1)],
                    pixels[i * width + (j+1)],
                    pixels[(i+1) * width + (j-1)],
                    pixels[(i+1) * width + j],
                    pixels[(i+1) * width + (j+1)]
                ]
                if center not in neighbors:
                    noise_count += 1
        return noise_count / (width * height)
    
    def _detect_shapes(self, image: Image.Image) -> float:
        """检测图像中的形状复杂度"""
        # 转换为OpenCV格式
        img_array = np.array(image)
        gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
        
        # 边缘检测
        edges = cv2.Canny(gray, 50, 150)
        
        # 形状检测
        contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        shape_count = len(contours)
        return shape_count / (image.width * image.height)
    
    def solve_simple_text_captcha(self, image: Image.Image) -> str:
        """解决简单文字验证码"""
        # 预处理图像
        processed_img = self._preprocess_captcha(image)
        
        # 使用OCR识别
        try:
            text = pytesseract.image_to_string(processed_img, config='--psm 8')
            return text.strip()
        except:
            return ""
    
    def _preprocess_captcha(self, image: Image.Image) -> Image.Image:
        """验证码预处理"""
        # 转换为numpy数组
        img_array = np.array(image)
        gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
        
        # 二值化
        _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
        
        # 去噪
        kernel = np.ones((2,2), np.uint8)
        cleaned = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
        
        # 转换回PIL格式
        return Image.fromarray(cleaned)
    
    def handle_slider_captcha(self, device, slider_element) -> bool:
        """处理滑块验证码"""
        try:
            # 获取滑块位置
            slider_bounds = slider_element.bounds
            start_x = slider_bounds.left + slider_bounds.width // 2
            start_y = slider_bounds.top + slider_bounds.height // 2
            
            # 模拟人类滑动行为
            self._simulate_human_drag(device, start_x, start_y)
            return True
        except Exception as e:
            print(f"滑块验证码处理失败: {e}")
            return False
    
    def _simulate_human_drag(self, device, start_x, start_y):
        """模拟人类拖拽行为"""
        # 生成随机的滑动轨迹
        target_distance = 200  # 假设滑块需要滑动的距离
        trajectory = self._generate_trajectory(start_x, start_y, target_distance)
        
        # 执行滑动
        device.touch.down(start_x, start_y)
        time.sleep(random.uniform(0.1, 0.3))
        
        for point in trajectory:
            device.touch.move(point[0], point[1])
            time.sleep(random.uniform(0.01, 0.03))
        
        device.touch.up()
    
    def _generate_trajectory(self, start_x, start_y, distance) -> List[tuple]:
        """生成滑动轨迹"""
        trajectory = [(start_x, start_y)]
        current_x = start_x
        current_y = start_y
        remaining = distance
        
        while remaining > 0:
            # 随机移动距离
            step = min(random.randint(5, 15), remaining)
            current_x += step
            # 添加轻微的Y轴偏移模拟人手抖动
            current_y += random.randint(-2, 2)
            trajectory.append((current_x, current_y))
            remaining -= step
            
            # 随机停顿
            if random.random() < 0.1:  # 10%概率停顿
                time.sleep(random.uniform(0.05, 0.2))
        
        return trajectory

class RateLimiter:
    """频率限制器"""
    
    def __init__(self, requests_per_minute: int = 10, burst_size: int = 3):
        self.requests_per_minute = requests_per_minute
        self.burst_size = burst_size
        self.requests = []
        self.lockout_end_time = 0
    
    def can_make_request(self) -> bool:
        """检查是否可以发送请求"""
        current_time = time.time()
        
        # 检查是否还在封禁期
        if current_time < self.lockout_end_time:
            return False
        
        # 清理过期的请求记录
        self.requests = [req_time for req_time in self.requests 
                        if current_time - req_time < 60]
        
        # 检查是否超过限制
        if len(self.requests) < self.burst_size:
            # 在突发窗口内,允许请求
            return True
        elif len(self.requests) < self.requests_per_minute:
            # 在分钟窗口内,还有余量
            return True
        else:
            return False
    
    def record_request(self):
        """记录请求"""
        self.requests.append(time.time())
    
    def apply_rate_limit(self):
        """应用频率限制"""
        if not self.can_make_request():
            # 计算需要等待的时间
            wait_time = 60 - (time.time() - min(self.requests))
            if wait_time > 0:
                time.sleep(wait_time)
    
    def handle_rate_limit_response(self, response_code: int):
        """处理频率限制响应"""
        if response_code in [429, 403]:  # Too Many Requests or Forbidden
            # 增加等待时间
            self.lockout_end_time = time.time() + random.randint(300, 1800)  # 5-30分钟
            print(f"触发频率限制,将在 {self.lockout_end_time - time.time():.0f} 秒后恢复")

def anti_anti_spider_main():
    """反反爬虫主函数"""
    print("=== 反反爬虫策略实现 ===\n")
    
    # 验证码处理演示
    captcha_handler = CaptchaHandler()
    print("验证码处理功能已准备就绪")
    print("- 支持多种验证码类型检测")
    print("- 包含OCR识别功能")
    print("- 提供滑块验证码处理")
    
    # 频率限制演示
    rate_limiter = RateLimiter(requests_per_minute=5)
    print("\n频率限制功能已准备就绪")
    print("- 支持突发请求处理")
    print("- 提供智能等待机制")
    print("- 包含封禁期管理")

if __name__ == "__main__":
    anti_anti_spider_main()

4. 数据爬取实现

4.1 商品列表页爬取

import asyncio
from typing import List, Dict, Optional
import aiohttp
import uiautomator2 as u2

class ProductListCrawler:
    """商品列表页爬取器"""
    
    def __init__(self, device_serial: str = None):
        self.device = u2.connect(device_serial) if device_serial else u2.connect()
        self.session = aiohttp.ClientSession()
        self.products = []
    
    async def crawl_category_products(self, category_id: str, max_pages: int = 10) -> List[Dict]:
        """爬取分类商品列表"""
        products = []
        page = 1
        
        while page <= max_pages:
            print(f"正在爬取第 {page} 页商品...")
            
            # 模拟滚动到底部加载更多商品
            await self._scroll_to_load_more()
            
            # 提取当前页面的商品信息
            page_products = await self._extract_product_info()
            products.extend(page_products)
            
            # 检查是否有下一页
            if not await self._has_next_page() or len(page_products) == 0:
                break
            
            # 点击下一页
            await self._click_next_page()
            page += 1
            
            # 随机延迟,模拟人类行为
            await asyncio.sleep(random.uniform(2, 5))
        
        return products
    
    async def _scroll_to_load_more(self):
        """滚动到底部加载更多商品"""
        # 获取屏幕高度
        screen_height = self.device.info['displayHeight']
        scroll_distance = int(screen_height * 0.8)
        
        # 执行滚动操作
        self.device.swipe(500, screen_height - 100, 500, screen_height - 100 - scroll_distance)
        await asyncio.sleep(1)  # 等待内容加载
    
    async def _extract_product_info(self) -> List[Dict]:
        """提取当前页面的商品信息"""
        products = []
        try:
            # 查找商品元素
            product_elements = self.device(resourceIdMatches=r".*product.*", clickable=True)
            
            for element in product_elements:
                if element.exists:
                    try:
                        # 提取商品信息
                        product = {
                            'id': self._extract_product_id(element),
                            'title': self._extract_product_title(element),
                            'price': self._extract_product_price(element),
                            'sales': self._extract_product_sales(element),
                            'rating': self._extract_product_rating(element),
                            'thumbnail': self._extract_product_thumbnail(element),
                            'discount': self._extract_product_discount(element)
                        }
                        
                        # 过滤无效商品
                        if self._is_valid_product(product):
                            products.append(product)
                    except Exception as e:
                        print(f"提取商品信息失败: {e}")
                        continue
        except Exception as e:
            print(f"查找商品元素失败: {e}")
        
        return products
    
    def _extract_product_id(self, element) -> str:
        """提取商品ID"""
        try:
            # 尝试从各种可能的属性中提取ID
            for attr in ['text', 'contentDescription', 'resourceId', 'packageName']:
                value = getattr(element, attr, None)
                if value and 'id' in str(value).lower():
                    # 提取数字ID
                    import re
                    ids = re.findall(r'\d+', str(value))
                    if ids:
                        return ids[0]
            
            # 如果找不到,生成一个唯一ID
            return str(hash(str(element.bounds)))
        except:
            return ""
    
    def _extract_product_title(self, element) -> str:
        """提取商品标题"""
        try:
            # 尝试多种方式提取标题
            for attr in ['text', 'contentDescription']:
                value = getattr(element, attr, None)
                if value and len(str(value)) > 2:  # 至少2个字符
                    return str(value)[:100]  # 限制长度
            
            return ""
        except:
            return ""
    
    def _extract_product_price(self, element) -> str:
        """提取商品价格"""
        try:
            # 查找价格相关的子元素
            price_elements = element(childSelector="*id*price*").child() or element(textMatches=r\d+(\.\d+)?")
            if price_elements.exists:
                return price_elements.get_text()
            
            # 从文本中提取价格
            text = element.get_text() or element.content_desc or ""
            import re
            prices = re.findall(r(\d+(?:\.\d+)?)', text)
            if prices:
                return f{prices[0]}"
            
            return ""
        except:
            return ""
    
    def _extract_product_sales(self, element) -> str:
        """提取商品销量"""
        try:
            # 查找销量相关的元素
            sales_elements = element(textMatches=r".*[万千百]*销量.*\d+.*") or element(textMatches=r"\d+[万千百]*+")
            if sales_elements.exists:
                text = sales_elements.get_text()
                import re
                numbers = re.findall(r'\d+[万千百]*', text)
                if numbers:
                    return numbers[0]
            
            return ""
        except:
            return ""
    
    def _extract_product_rating(self, element) -> float:
        """提取商品评分"""
        try:
            # 查找评分相关的元素
            rating_elements = element(resourceIdMatches=r".*rating.*") or element(textMatches=r"[\d.]+分")
            if rating_elements.exists:
                text = rating_elements.get_text()
                import re
                ratings = re.findall(r'[\d.]+', text)
                if ratings:
                    return float(ratings[0])
            
            return 0.0
        except:
            return 0.0
    
    def _extract_product_thumbnail(self, element) -> str:
        """提取商品缩略图"""
        try:
            # 查找图片元素
            image_elements = element(className="android.widget.ImageView")
            if image_elements.exists:
                # 获取图片URL或截图
                bounds = image_elements.bounds
                # 这里可以截取图片区域或获取图片URL
                return f"image_bounds_{bounds}"
            
            return ""
        except:
            return ""
    
    def _extract_product_discount(self, element) -> str:
        """提取商品折扣信息"""
        try:
            # 查找折扣相关的元素
            discount_elements = element(textMatches=r".*[折|%].*") or element(textMatches=r"\d+[%折]")
            if discount_elements.exists:
                return discount_elements.get_text()
            
            return ""
        except:
            return ""
    
    def _is_valid_product(self, product: Dict) -> bool:
        """验证商品信息是否有效"""
        return (
            product['id'] and
            product['title'] and
            product['price']
        )
    
    async def _has_next_page(self) -> bool:
        """检查是否有下一页"""
        try:
            # 查找下一页按钮
            next_button = self.device(text="下一页") or self.device(resourceIdMatches=r".*next.*") or self.device(descriptionMatches=r".*next.*")
            return next_button.exists
        except:
            return False
    
    async def _click_next_page(self):
        """点击下一页按钮"""
        try:
            next_button = self.device(text="下一页") or self.device(resourceIdMatches=r".*next.*") or self.device(descriptionMatches=r".*next.*")
            if next_button.exists:
                next_button.click()
                await asyncio.sleep(2)  # 等待页面加载
        except Exception as e:
            print(f"点击下一页失败: {e}")

class ProductDetailCrawler:
    """商品详情页爬取器"""
    
    def __init__(self, device_serial: str = None):
        self.device = u2.connect(device_serial) if device_serial else u2.connect()
        self.session = aiohttp.ClientSession()
    
    async def crawl_product_details(self, product_ids: List[str]) -> List[Dict]:
        """爬取商品详情"""
        details = []
        for product_id in product_ids:
            print(f"正在爬取商品 {product_id} 的详情...")
            detail = await self._get_product_detail(product_id)
            if detail:
                details.append(detail)
            await asyncio.sleep(random.uniform(1, 3))  # 随机延迟
        return details
    
    async def _get_product_detail(self, product_id: str) -> Optional[Dict]:
        """获取单个商品详情"""
        try:
            # 这里应该是具体的商品详情页爬取逻辑
            # 由于无法实际操作真实App,我们模拟返回数据
            detail = {
                'id': product_id,
                'title': f'Product {product_id} Title',
                'price': f{random.randint(10, 999)}.00',
                'original_price': f{random.randint(20, 1999)}.00',
                'description': f'Detailed description for product {product_id}',
                'specifications': self._get_random_specs(),
                'images': [f'https://example.com/image/{product_id}_{i}.jpg' for i in range(1, 4)],
                'seller_info': self._get_seller_info(),
                'stock': random.randint(0, 1000),
                'brand': f'Brand {random.choice(["A", "B", "C"])}',
                'category_path': f'Electronics > Phones > {product_id[:2]}',
                'delivery_info': self._get_delivery_info(),
                'service_guarantees': ['7-day return', 'Free shipping', 'Warranty']
            }
            return detail
        except Exception as e:
            print(f"获取商品详情失败 {product_id}: {e}")
            return None
    
    def _get_random_specs(self) -> Dict:
        """生成随机规格信息"""
        specs = {
            'color': random.choice(['Black', 'White', 'Red', 'Blue']),
            'size': random.choice(['S', 'M', 'L', 'XL']),
            'weight': f'{random.uniform(0.1, 5.0):.2f}kg',
            'dimensions': f'{random.randint(10, 30)} x {random.randint(5, 20)} x {random.randint(1, 5)} cm',
            'material': random.choice(['Plastic', 'Metal', 'Wood', 'Fabric'])
        }
        return specs
    
    def _get_seller_info(self) -> Dict:
        """获取卖家信息"""
        sellers = [
            {'name': 'Official Store', 'rating': 4.8, 'sales': '100K+'},
            {'name': 'Authorized Dealer', 'rating': 4.6, 'sales': '50K+'},
            {'name': 'Premium Seller', 'rating': 4.9, 'sales': '200K+'}
        ]
        return random.choice(sellers)
    
    def _get_delivery_info(self) -> Dict:
        """获取配送信息"""
        delivery = {
            'free_shipping': random.choice([True, False]),
            'shipping_cost': f{random.uniform(0, 20):.2f}' if not random.choice([True, False]) else 'Free',
            'delivery_time': f'{random.randint(1, 7)} day(s)',
            'warehouses': ['Beijing', 'Shanghai', 'Guangzhou'][random.randint(0, 2)]
        }
        return delivery

class ReviewCrawler:
    """评论爬取器"""
    
    def __init__(self, device_serial: str = None):
        self.device = u2.connect(device_serial) if device_serial else u2.connect()
        self.session = aiohttp.ClientSession()
    
    async def crawl_reviews(self, product_id: str, max_pages: int = 5) -> List[Dict]:
        """爬取商品评论"""
        reviews = []
        page = 1
        
        while page <= max_pages:
            print(f"正在爬取商品 {product_id}{page} 页评论...")
            
            page_reviews = await self._get_review_page(product_id, page)
            reviews.extend(page_reviews)
            
            if len(page_reviews) == 0:
                break
                
            page += 1
            await asyncio.sleep(random.uniform(2, 4))
        
        return reviews
    
    async def _get_review_page(self, product_id: str, page: int) -> List[Dict]:
        """获取评论页面数据"""
        # 模拟评论数据
        reviews = []
        for i in range(random.randint(5, 15)):  # 每页5-15条评论
            review = {
                'id': f'{product_id}_review_{page}_{i}',
                'user_id': f'user_{random.randint(1000, 9999)}',
                'username': f'User{random.randint(100, 999)}',
                'rating': random.randint(1, 5),
                'content': self._generate_review_content(),
                'date': f'2024-{random.randint(1, 12):02d}-{random.randint(1, 28):02d}',
                'useful_count': random.randint(0, 100),
                'images': [f'https://example.com/review_img/{random.randint(1, 1000)}.jpg'] if random.choice([True, False]) else [],
                'video': f'https://example.com/review_video/{random.randint(1, 100)}.mp4' if random.choice([True, False, False, False]) else None,
                'helpful': random.choice([True, False]),
                'verified_purchase': random.choice([True, False])
            }
            reviews.append(review)
        return reviews
    
    def _generate_review_content(self) -> str:
        """生成评论内容"""
        templates = [
            "很好用,性价比很高,推荐购买!",
            "质量不错,物流很快,服务态度也好。",
            "符合预期,包装完好,会回购的。",
            "使用效果还可以,不过有些小瑕疵。",
            "不太满意,感觉和描述有些差距。",
            "超出预期的好,非常满意这次购物。",
            "中规中矩吧,没什么特别的。",
            "值得购买,物有所值,好评!"
        ]
        return random.choice(templates)

async def main_crawling_process():
    """主爬取流程"""
    print("=== 开始电商App数据爬取 ===\n")
    
    # 1. 商品列表爬取
    print("1. 开始商品列表爬取...")
    list_crawler = ProductListCrawler()
    products = await list_crawler.crawl_category_products("electronics", max_pages=3)
    print(f"已爬取 {len(products)} 个商品\n")
    
    # 2. 商品详情爬取(只爬取前几个商品的详情)
    print("2. 开始商品详情爬取...")
    detail_crawler = ProductDetailCrawler()
    sample_product_ids = [p['id'] for p in products[:5]]  # 只爬取前5个商品的详情
    details = await detail_crawler.crawl_product_details(sample_product_ids)
    print(f"已爬取 {len(details)} 个商品详情\n")
    
    # 3. 评论爬取(只爬取前2个商品的评论)
    print("3. 开始评论爬取...")
    review_crawler = ReviewCrawler()
    all_reviews = []
    for product_id in sample_product_ids[:2]:
        reviews = await review_crawler.crawl_reviews(product_id, max_pages=2)
        all_reviews.extend(reviews)
    print(f"已爬取 {len(all_reviews)} 条评论\n")
    
    # 4. 汇总结果
    print("=== 爬取结果汇总 ===")
    print(f"商品列表: {len(products)} 个")
    print(f"商品详情: {len(details)} 个")
    print(f"用户评论: {len(all_reviews)} 条")
    
    # 5. 保存结果(这里只是示例,实际应保存到数据库)
    print("\n数据已准备好保存到数据库...")
    return {
        'products': products,
        'details': details,
        'reviews': all_reviews
    }

if __name__ == "__main__":
    # 注意:由于涉及实际的App操作,这里只是一个框架演示
    # 实际运行需要真实的App环境
    print("爬取器已初始化完成,准备开始数据爬取...")
    print("注意:实际运行需要连接真实设备并启动目标App")

5. 数据处理与存储

5.1 数据清洗与验证

import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional
import re
from datetime import datetime

class DataCleaner:
    """数据清洗器"""
    
    def __init__(self):
        self.validation_rules = {}
        self.cleaning_functions = {}
        self.stats = {
            'total_records': 0,
            'cleaned_records': 0,
            'discarded_records': 0,
            'duplicate_records': 0
        }
    
    def add_validation_rule(self, field: str, rule_func):
        """添加验证规则"""
        self.validation_rules[field] = rule_func
    
    def add_cleaning_function(self, field: str, clean_func):
        """添加清洗函数"""
        self.cleaning_functions[field] = clean_func
    
    def validate_product_data(self, products: List[Dict]) -> List[Dict]:
        """验证商品数据"""
        validated_products = []
        for product in products:
            if self._validate_single_product(product):
                validated_products.append(product)
            else:
                self.stats['discarded_records'] += 1
        return validated_products
    
    def _validate_single_product(self, product: Dict) -> bool:
        """验证单个商品数据"""
        # 必需字段检查
        required_fields = ['id', 'title', 'price']
        for field in required_fields:
            if field not in product or not product[field]:
                return False
        
        # ID格式验证
        if not self._validate_product_id(product['id']):
            return False
        
        # 价格格式验证
        if not self._validate_price(product['price']):
            return False
        
        # 标题长度验证
        if len(product['title']) < 2 or len(product['title']) > 200:
            return False
        
        # 销量格式验证(如果存在)
        if 'sales' in product and product['sales']:
            if not self._validate_sales(product['sales']):
                return False
        
        # 评分验证(如果存在)
        if 'rating' in product and product['rating']:
            if not self._validate_rating(product['rating']):
                return False
        
        return True
    
    def _validate_product_id(self, product_id: str) -> bool:
        """验证商品ID格式"""
        # ID应该是数字或字母数字组合
        return bool(re.match(r'^[a-zA-Z0-9_-]+$', str(product_id)))
    
    def _validate_price(self, price: str) -> bool:
        """验证价格格式"""
        # 价格格式:¥数字.数字 或 数字.数字 或 纯数字
        price_str = str(price)
        # 移除货币符号
        price_str = price_str.replace('¥', '').replace('¥', '').strip()
        try:
            float(price_str)
            return True
        except ValueError:
            return False
    
    def _validate_sales(self, sales: str) -> bool:
        """验证销量格式"""
        sales_str = str(sales)
        # 支持数字+单位格式:如1000+, 1万+, 2千+
        if sales_str.endswith('+'):
            sales_str = sales_str[:-1]
        # 移除单位
        sales_str = re.sub(r'[万千百十]', '', sales_str)
        try:
            float(sales_str)
            return True
        except ValueError:
            return False
    
    def _validate_rating(self, rating: float) -> bool:
        """验证评分格式"""
        try:
            rating_float = float(rating)
            return 0 <= rating_float <= 5
        except ValueError:
            return False
    
    def clean_product_data(self, products: List[Dict]) -> List[Dict]:
        """清洗商品数据"""
        cleaned_products = []
        for product in products:
            cleaned_product = self._clean_single_product(product)
            if cleaned_product:
                cleaned_products.append(cleaned_product)
                self.stats['cleaned_records'] += 1
            else:
                self.stats['discarded_records'] += 1
        return cleaned_products
    
    def _clean_single_product(self, product: Dict) -> Optional[Dict]:
        """清洗单个商品数据"""
        cleaned = product.copy()
        
        # 清洗ID
        if 'id' in cleaned:
            cleaned['id'] = str(cleaned['id']).strip()
        
        # 清洗标题
        if 'title' in cleaned:
            cleaned['title'] = self._clean_text(cleaned['title'])
            cleaned['title'] = cleaned['title'][:200]  # 限制长度
        
        # 清洗价格
        if 'price' in cleaned:
            cleaned['price'] = self._clean_price(cleaned['price'])
        
        # 清洗销量
        if 'sales' in cleaned:
            cleaned['sales'] = self._clean_sales(cleaned['sales'])
        
        # 清洗评分
        if 'rating' in cleaned:
            cleaned['rating'] = self._clean_rating(cleaned['rating'])
        
        # 清洗描述(如果存在)
        if 'description' in cleaned:
            cleaned['description'] = self._clean_text(cleaned['description'])
            cleaned['description'] = cleaned['description'][:1000]  # 限制长度
        
        # 清洗品牌(如果存在)
        if 'brand' in cleaned:
            cleaned['brand'] = self._clean_text(cleaned['brand'])
            cleaned['brand'] = cleaned['brand'][:100]  # 限制长度
        
        # 清洗类别路径(如果存在)
        if 'category_path' in cleaned:
            cleaned['category_path'] = self._clean_text(cleaned['category_path'])
            cleaned['category_path'] = cleaned['category_path'][:500]  # 限制长度
        
        return cleaned
    
    def _clean_text(self, text: str) -> str:
        """清洗文本数据"""
        if not text:
            return ""
        text = str(text).strip()
        # 移除多余的空白字符
        text = re.sub(r'\s+', ' ', text)
        # 移除控制字符
        text = ''.join(char for char in text if ord(char) >= 32)
        return text
    
    def _clean_price(self, price: str) -> str:
        """清洗价格数据"""
        price_str = str(price).strip()
        # 移除货币符号
        price_str = price_str.replace('¥', '').replace('¥', '').replace('$', '').strip()
        # 移除多余空格
        price_str = re.sub(r'\s+', '', price_str)
        return price_str
    
    def _clean_sales(self, sales: str) -> str:
        """清洗销量数据"""
        sales_str = str(sales).strip()
        if not sales_str:
            return ""
        # 移除多余字符,保留数字和单位
        sales_str = re.sub(r'[^\d\.万千百十\+\-]', '', sales_str)
        return sales_str
    
    def _clean_rating(self, rating: Any) -> float:
        """清洗评分数据"""
        try:
            return round(float(rating), 1)
        except ValueError:
            return 0.0
    
    def remove_duplicates(self, products: List[Dict]) -> List[Dict]:
        """去除重复数据"""
        seen_ids = set()
        unique_products = []
        for product in products:
            product_id = product.get('id', '')
            if product_id and product_id not in seen_ids:
                seen_ids.add(product_id)
                unique_products.append(product)
            else:
                self.stats['duplicate_records'] += 1
        return unique_products

class ReviewCleaner:
    """评论数据清洗器"""
    
    def __init__(self):
        self.stats = {
            'total_reviews': 0,
            'cleaned_reviews': 0,
            'discarded_reviews': 0,
            'duplicate_reviews': 0
        }
    
    def validate_reviews(self, reviews: List[Dict]) -> List[Dict]:
        """验证评论数据"""
        validated_reviews = []
        for review in reviews:
            if self._validate_single_review(review):
                validated_reviews.append(review)
            else:
                self.stats['discarded_reviews'] += 1
        return validated_reviews
    
    def _validate_single_review(self, review: Dict) -> bool:
        """验证单个评论数据"""
        # 必需字段检查
        required_fields = ['id', 'user_id', 'rating', 'content']
        for field in required_fields:
            if field not in review or not review[field]:
                return False
        
        # ID格式验证
        if not re.match(r'^[a-zA-Z0-9_-]+$', str(review['id'])):
            return False
        
        # 评分范围验证
        try:
            rating = float(review['rating'])
            if not (1 <= rating <= 5):
                return False
        except ValueError:
            return False
        
        # 内容长度验证
        content = str(review.get('content', ''))
        if len(content) < 1 or len(content) > 2000:
            return False
        
        # 日期格式验证(如果存在)
        if 'date' in review and review['date']:
            try:
                datetime.strptime(str(review['date']), '%Y-%m-%d')
            except ValueError:
                try:
                    datetime.strptime(str(review['date']), '%Y-%m-%d %H:%M:%S')
                except ValueError:
                    return False
        
        return True
    
    def clean_reviews(self, reviews: List[Dict]) -> List[Dict]:
        """清洗评论数据"""
        cleaned_reviews = []
        for review in reviews:
            cleaned_review = self._clean_single_review(review)
            if cleaned_review:
                cleaned_reviews.append(cleaned_review)
                self.stats['cleaned_reviews'] += 1
            else:
                self.stats['discarded_reviews'] += 1
        return cleaned_reviews
    
    def _clean_single_review(self, review: Dict) -> Optional[Dict]:
        """清洗单个评论数据"""
        cleaned = review.copy()
        
        # 清洗ID
        if 'id' in cleaned:
            cleaned['id'] = str(cleaned['id']).strip()
        
        # 清洗用户ID
        if 'user_id' in cleaned:
            cleaned['user_id'] = str(cleaned['user_id']).strip()
        
        # 清洗用户名(如果存在)
        if 'username' in cleaned:
            cleaned['username'] = self._clean_text(cleaned['username'])
            cleaned['username'] = cleaned['username'][:100]
        
        # 清洗评分
        if 'rating' in cleaned:
            try:
                cleaned['rating'] = round(float(cleaned['rating']), 1)
            except ValueError:
                cleaned['rating'] = 0.0
        
        # 清洗内容
        if 'content' in cleaned:
            cleaned['content'] = self._clean_text(cleaned['content'])
            cleaned['content'] = cleaned['content'][:2000]
        
        # 清洗日期
        if 'date' in cleaned:
            cleaned['date'] = self._clean_date(cleaned['date'])
        
        # 清洗有用计数(如果存在)
        if 'useful_count' in cleaned:
            try:
                cleaned['useful_count'] = int(cleaned['useful_count'])
            except ValueError:
                cleaned['useful_count'] = 0
        
        # 清洗图片链接(如果存在)
        if 'images' in cleaned and isinstance(cleaned['images'], list):
            cleaned['images'] = [str(img) for img in cleaned['images'] if img]
        
        # 清洗视频链接(如果存在)
        if 'video' in cleaned:
            cleaned['video'] = str(cleaned['video']) if cleaned['video'] else None
        
        return cleaned
    
    def _clean_text(self, text: str) -> str:
        """清洗文本数据"""
        if not text:
            return ""
        text = str(text).strip()
        # 移除多余的空白字符
        text = re.sub(r'\s+', ' ', text)
        # 移除控制字符
        text = ''.join(char for char in text if ord(char) >= 32)
        return text
    
    def _clean_date(self, date_str: str) -> str:
        """清洗日期数据"""
        date_str = str(date_str).strip()
        # 尝试解析并标准化日期格式
        try:
            # 解析不同格式的日期
            if '-' in date_str and ':' in date_str:
                dt = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
            elif '-' in date_str and len(date_str) == 10:
                dt = datetime.strptime(date_str, '%Y-%m-%d')
            else:
                return date_str  # 如果无法解析,返回原值
            return dt.strftime('%Y-%m-%d')
        except ValueError:
            return date_str  # 如果无法解析,返回原值
    
    def remove_duplicate_reviews(self, reviews: List[Dict]) -> List[Dict]:
        """去除重复评论"""
        seen_ids = set()
        unique_reviews = []
        for review in reviews:
            review_id = review.get('id', '')
            if review_id and review_id not in seen_ids:
                seen_ids.add(review_id)
                unique_reviews.append(review)
            else:
                self.stats['duplicate_reviews'] += 1
        return unique_reviews

def data_cleaning_main():
    """数据清洗主函数"""
    print("=== 数据清洗与验证 ===\n")
    
    # 商品数据清洗示例
    cleaner = DataCleaner()
    sample_products = [
        {
            'id': '12345',
            'title': '  iPhone 13 Pro Max  ',
            'price': '¥8999.00 ',
            'sales': '1.2万+',
            'rating': 4.8,
            'description': 'This is a great phone...'
        },
        {
            'id': '12346',
            'title': 'Samsung Galaxy S21',
            'price': '7999.00',
            'sales': '8000+',
            'rating': 4.5
        }
    ]
    
    print("原始数据:")
    for i, product in enumerate(sample_products):
        print(f"  产品 {i+1}: {product}")
    
    # 验证数据
    validated = cleaner.validate_product_data(sample_products)
    print(f"\n验证后数据: {len(validated)} 条")
    
    # 清洗数据
    cleaned = cleaner.clean_product_data(validated)
    print(f"清洗后数据: {len(cleaned)} 条")
    
    # 去重
    unique = cleaner.remove_duplicates(cleaned)
    print(f"去重后数据: {len(unique)} 条")
    
    print(f"\n清洗统计:")
    print(f"  总记录数: {len(sample_products)}")
    print(f"  清洗记录数: {cleaner.stats['cleaned_records']}")
    print(f"  丢弃记录数: {cleaner.stats['discarded_records']}")
    print(f"  重复记录数: {cleaner.stats['duplicate_records']}")
    
    # 评论数据清洗示例
    review_cleaner = ReviewCleaner()
    sample_reviews = [
        {
            'id': 'rev_001',
            'user_id': 'user_123',
            'rating': 5.0,
            'content': '  这个产品真的很好用!  ',
            'date': '2024-01-15'
        }
    ]
    
    print(f"\n评论清洗示例:")
    validated_reviews = review_cleaner.validate_reviews(sample_reviews)
    cleaned_reviews = review_cleaner.clean_reviews(validated_reviews)
    unique_reviews = review_cleaner.remove_duplicate_reviews(cleaned_reviews)
    
    print(f"原始评论: {len(sample_reviews)}, 验证后: {len(validated_reviews)}, 清洗后: {len(cleaned_reviews)}, 去重后: {len(unique_reviews)}")

if __name__ == "__main__":
    data_cleaning_main()

5.2 数据库设计与存储

from sqlalchemy import create_engine, Column, Integer, String, Float, Text, DateTime, Boolean, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime
import pymysql
import json

Base = declarative_base()

class Product(Base):
    """商品表"""
    __tablename__ = 'products'
    
    id = Column(String(50), primary_key=True, comment='商品ID')
    title = Column(String(200), nullable=False, comment='商品标题')
    price = Column(Float, nullable=False, comment='价格')
    original_price = Column(Float, comment='原价')
    sales = Column(String(50), comment='销量')
    rating = Column(Float, comment='评分')
    thumbnail = Column(String(500), comment='缩略图URL')
    discount = Column(String(50), comment='折扣信息')
    description = Column(Text, comment='商品描述')
    brand = Column(String(100), comment='品牌')
    category_path = Column(String(500), comment='类别路径')
    stock = Column(Integer, comment='库存')
    delivery_info = Column(Text, comment='配送信息')
    service_guarantees = Column(Text, comment='服务保证(JSON格式)')
    specifications = Column(Text, comment='规格参数(JSON格式)')
    images = Column(Text, comment='图片列表(JSON格式)')
    seller_info = Column(Text, comment='卖家信息(JSON格式)')
    created_at = Column(DateTime, default=datetime.utcnow, comment='创建时间')
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')
    status = Column(Integer, default=1, comment='状态(1-正常,0-下架)')

class Review(Base):
    """评论表"""
    __tablename__ = 'reviews'
    
    id = Column(String(50), primary_key=True, comment='评论ID')
    product_id = Column(String(50), ForeignKey('products.id'), nullable=False, comment='商品ID')
    user_id = Column(String(50), nullable=False, comment='用户ID')
    username = Column(String(100), comment='用户名')
    rating = Column(Float, nullable=False, comment='评分')
    content = Column(Text, nullable=False, comment='评论内容')
    date = Column(String(20), comment='评论日期')
    useful_count = Column(Integer, default=0, comment='有用计数')
    helpful = Column(Boolean, comment='是否有帮助')
    verified_purchase = Column(Boolean, comment='是否已验证购买')
    images = Column(Text, comment='评论图片(JSON格式)')
    video = Column(String(500), comment='评论视频URL')
    created_at = Column(DateTime, default=datetime.utcnow, comment='创建时间')
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')
    status = Column(Integer, default=1, comment='状态(1-正常,0-隐藏)')
    
    # 关系
    product = relationship("Product", backref="reviews")

class DatabaseManager:
    """数据库管理器"""
    
    def __init__(self, db_url: str = "mysql+pymysql://crawler_user:crawler_password@localhost/ecommerce_data"):
        self.engine = create_engine(db_url, echo=False, pool_pre_ping=True)
        self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=self.engine)
        self.create_tables()
    
    def create_tables(self):
        """创建数据表"""
        Base.metadata.create_all(bind=self.engine)
        print("✓ 数据库表创建完成")
    
    def get_session(self):
        """获取数据库会话"""
        return self.SessionLocal()
    
    def save_products(self, products: List[Dict]):
        """保存商品数据"""
        session = self.get_session()
        try:
            for product_data in products:
                # 检查是否已存在
                existing_product = session.query(Product).filter(Product.id == product_data['id']).first()
                if existing_product:
                    # 更新现有记录
                    for key, value in product_data.items():
                        if hasattr(existing_product, key):
                            if key in ['specifications', 'images', 'seller_info', 'service_guarantees', 'delivery_info']:
                                setattr(existing_product, key, json.dumps(value, ensure_ascii=False))
                            else:
                                setattr(existing_product, key, value)
                    existing_product.updated_at = datetime.utcnow()
                else:
                    # 创建新记录
                    product = Product(
                        id=product_data['id'],
                        title=product_data['title'],
                        price=self._extract_price(product_data['price']),
                        original_price=self._extract_price(product_data.get('original_price', 0)),
                        sales=product_data.get('sales', ''),
                        rating=float(product_data.get('rating', 0)),
                        thumbnail=product_data.get('thumbnail', ''),
                        discount=product_data.get('discount', ''),
                        description=product_data.get('description', ''),
                        brand=product_data.get('brand', ''),
                        category_path=product_data.get('category_path', ''),
                        stock=product_data.get('stock', 0),
                        delivery_info=json.dumps(product_data.get('delivery_info', {}), ensure_ascii=False),
                        service_guarantees=json.dumps(product_data.get('service_guarantees', []), ensure_ascii=False),
                        specifications=json.dumps(product_data.get('specifications', {}), ensure_ascii=False),
                        images=json.dumps(product_data.get('images', []), ensure_ascii=False),
                        seller_info=json.dumps(product_data.get('seller_info', {}), ensure_ascii=False)
                    )
                    session.add(product)
            
            session.commit()
            print(f"✓ 成功保存 {len(products)} 个商品数据")
        except Exception as e:
            session.rollback()
            print(f"✗ 保存商品数据失败: {e}")
            raise
        finally:
            session.close()
    
    def save_reviews(self, reviews: List[Dict]):
        """保存评论数据"""
        session = self.get_session()
        try:
            for review_data in reviews:
                # 检查是否已存在
                existing_review = session.query(Review).filter(Review.id == review_data['id']).first()
                if existing_review:
                    # 更新现有记录
                    for key, value in review_data.items():
                        if hasattr(existing_review, key):
                            if key == 'images':
                                setattr(existing_review, key, json.dumps(value, ensure_ascii=False))
                            else:
                                setattr(existing_review, key, value)
                    existing_review.updated_at = datetime.utcnow()
                else:
                    # 创建新记录
                    review = Review(
                        id=review_data['id'],
                        product_id=review_data['product_id'],
                        user_id=review_data['user_id'],
                        username=review_data.get('username', ''),
                        rating=float(review_data['rating']),
                        content=review_data['content'],
                        date=review_data.get('date', ''),
                        useful_count=review_data.get('useful_count', 0),
                        helpful=review_data.get('helpful'),
                        verified_purchase=review_data.get('verified_purchase')
                    )
                    if 'images' in review_data:
                        review.images = json.dumps(review_data['images'], ensure_ascii=False)
                    if 'video' in review_data and review_data['video']:
                        review.video = review_data['video']
                    
                    session.add(review)
            
            session.commit()
            print(f"✓ 成功保存 {len(reviews)} 条评论数据")
        except Exception as e:
            session.rollback()
            print(f"✗ 保存评论数据失败: {e}")
            raise
        finally:
            session.close()
    
    def _extract_price(self, price_str: str) -> float:
        """提取价格数值"""
        if not price_str:
            return 0.0
        # 移除货币符号并提取数字
        price_str = str(price_str).replace('¥', '').replace('¥', '').strip()
        try:
            return float(price_str)
        except ValueError:
            return 0.0
    
    def query_products(self, limit: int = 100, offset: int = 0) -> List[Product]:
        """查询商品数据"""
        session = self.get_session()
        try:
            products = session.query(Product).offset(offset).limit(limit).all()
            return products
        finally:
            session.close()
    
    def query_reviews_by_product(self, product_id: str) -> List[Review]:
        """根据商品ID查询评论"""
        session = self.get_session()
        try:
            reviews = session.query(Review).filter(Review.product_id == product_id).all()
            return reviews
        finally:
            session.close()
    
    def get_statistics(self) -> Dict:
        """获取数据统计"""
        session = self.get_session()
        try:
            total_products = session.query(Product).count()
            total_reviews = session.query(Review).count()
            avg_rating = session.query(Product).with_entities(Product.rating).filter(Product.rating > 0).scalar()
            
            stats = {
                'total_products': total_products,
                'total_reviews': total_reviews,
                'average_product_rating': avg_rating or 0.0,
                'products_with_reviews': session.query(Product).join(Review).distinct(Product.id).count()
            }
            return stats
        finally:
            session.close()

class DataManager:
    """数据管理器"""
    
    def __init__(self, db_url: str = "mysql+pymysql://crawler_user:crawler_password@localhost/ecommerce_data"):
        self.db_manager = DatabaseManager(db_url)
        self.data_cleaner = DataCleaner()
        self.review_cleaner = ReviewCleaner()
    
    def process_and_store_data(self, raw_data: Dict):
        """处理并存储数据"""
        print("开始处理和存储数据...")
        
        # 处理商品数据
        if 'products' in raw_data:
            print("处理商品数据...")
            # 验证
            validated_products = self.data_cleaner.validate_product_data(raw_data['products'])
            print(f"商品验证完成: {len(validated_products)} 条")
            
            # 清洗
            cleaned_products = self.data_cleaner.clean_product_data(validated_products)
            print(f"商品清洗完成: {len(cleaned_products)} 条")
            
            # 去重
            unique_products = self.data_cleaner.remove_duplicates(cleaned_products)
            print(f"商品去重完成: {len(unique_products)} 条")
            
            # 保存到数据库
            self.db_manager.save_products(unique_products)
        
        # 处理评论数据
        if 'reviews' in raw_data:
            print("处理评论数据...")
            # 验证
            validated_reviews = self.review_cleaner.validate_reviews(raw_data['reviews'])
            print(f"评论验证完成: {len(validated_reviews)} 条")
            
            # 清洗
            cleaned_reviews = self.review_cleaner.clean_reviews(validated_reviews)
            print(f"评论清洗完成: {len(cleaned_reviews)} 条")
            
            # 去重
            unique_reviews = self.review_cleaner.remove_duplicate_reviews(cleaned_reviews)
            print(f"评论去重完成: {len(unique_reviews)} 条")
            
            # 保存到数据库
            self.db_manager.save_reviews(unique_reviews)
        
        # 输出统计信息
        stats = self.db_manager.get_statistics()
        print(f"\n数据存储完成,当前统计:")
        print(f"  商品总数: {stats['total_products']}")
        print(f"  评论总数: {stats['total_reviews']}")
        print(f"  平均评分: {stats['average_product_rating']:.2f}")
        print(f"  有评论的商品数: {stats['products_with_reviews']}")

def database_main():
    """数据库主函数"""
    print("=== 数据库设计与存储 ===\n")
    
    # 创建数据库管理器
    db_manager = DatabaseManager()
    print("数据库连接和表结构创建完成")
    
    # 数据管理示例
    data_manager = DataManager()
    sample_data = {
        'products': [
            {
                'id': 'prod_001',
                'title': 'iPhone 13 Pro Max',
                'price': '¥8999.00',
                'sales': '1.2万+',
                'rating': 4.8,
                'description': '最新款iPhone,性能强劲',
                'brand': 'Apple',
                'category_path': '电子产品 > 手机 > Apple',
                'stock': 100,
                'specifications': {'color': 'Graphite', 'storage': '256GB'},
                'images': ['http://example.com/iphone1.jpg'],
                'seller_info': {'name': '官方旗舰店', 'rating': 4.9},
                'delivery_info': {'free_shipping': True, 'time': '2-3天'},
                'service_guarantees': ['7天无理由退货', '正品保证']
            }
        ],
        'reviews': [
            {
                'id': 'rev_001',
                'product_id': 'prod_001',
                'user_id': 'user_123',
                'username': '张三',
                'rating': 5.0,
                'content': '手机很好用,速度很快!',
                'date': '2024-01-15',
                'useful_count': 10,
                'helpful': True,
                'verified_purchase': True,
                'images': ['http://example.com/review1.jpg']
            }
        ]
    }
    
    print("开始处理示例数据...")
    data_manager.process_and_store_data(sample_data)
    
    # 查询验证
    products = db_manager.query_products(limit=10)
    print(f"\n查询到 {len(products)} 个商品")
    if products:
        print(f"第一个商品: {products[0].title} (评分: {products[0].rating})")

if __name__ == "__main__":
    database_main()

6. 项目总结与优化

6.1 项目回顾

经过完整的电商App爬虫项目开发,我们实现了以下核心功能:

  1. 环境搭建:建立了完整的开发和运行环境,包括设备环境、开发工具和依赖管理

  2. 反反爬虫策略:实现了SSL Pinning绕过、请求签名绕过、验证码处理和频率限制应对

  3. 数据爬取:开发了商品列表、商品详情和评论数据的爬取功能

  4. 数据处理:实现了数据清洗、验证、去重和标准化

  5. 数据存储:设计了合理的数据库结构并实现了高效的数据存储

6.2 性能优化建议

import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
import queue
import time
from typing import Callable

class PerformanceOptimizer:
    """性能优化器"""
    
    def __init__(self):
        self.optimization_tips = {
            'concurrency': {
                'max_workers': 5,
                'batch_size': 10,
                'delay_range': (1, 3)
            },
            'memory': {
                'chunk_size': 1000,
                'cleanup_interval': 300  # 5分钟
            },
            'network': {
                'timeout': 30,
                'retries': 3,
                'backoff_factor': 0.3
            }
        }
    
    def optimize_concurrent_crawling(self, max_workers: int = 5):
        """优化并发爬取"""
        print(f"设置最大工作线程数: {max_workers}")
        print("建议根据目标服务器承载能力和网络状况调整并发数")
        print("过高并发可能导致IP封禁或服务器压力过大")
    
    def optimize_memory_usage(self, chunk_size: int = 1000):
        """优化内存使用"""
        print(f"设置数据块大小: {chunk_size}")
        print("大块数据处理可以提高效率,但需注意内存占用")
        print("建议根据系统内存大小调整此参数")
    
    def optimize_network_requests(self, timeout: int = 30, retries: int = 3):
        """优化网络请求"""
        print(f"设置请求超时: {timeout}秒, 重试次数: {retries}次")
        print("合理的超时和重试机制可以提高爬取成功率")
    
    def implement_caching(self):
        """实现缓存机制"""
        cache_tips = """
        缓存优化策略:
        1. HTTP响应缓存 - 避免重复请求
        2. 数据库查询缓存 - 减少数据库压力
        3. 中间结果缓存 - 避免重复计算
        4. 设备状态缓存 - 减少设备通信
        """
        print(cache_tips)
    
    def suggest_monitoring_tools(self):
        """建议监控工具"""
        monitoring_tools = {
            'performance': ['Prometheus', 'Grafana', 'New Relic'],
            'logging': ['ELK Stack', 'Fluentd', 'Logstash'],
            'error_tracking': ['Sentry', 'Rollbar', 'Airbrake']
        }
        print("推荐的监控工具:")
        for category, tools in monitoring_tools.items():
            print(f"  {category}: {', '.join(tools)}")

def optimization_main():
    """优化主函数"""
    optimizer = PerformanceOptimizer()
    print("=== 性能优化建议 ===\n")
    
    optimizer.optimize_concurrent_crawling()
    print()
    optimizer.optimize_memory_usage()
    print()
    optimizer.optimize_network_requests()
    print()
    optimizer.implement_caching()
    print()
    optimizer.suggest_monitoring_tools()

if __name__ == "__main__":
    optimization_main()

6.3 法律合规与道德规范

在进行网络爬虫活动时,必须严格遵守相关法律法规和道德规范:

  1. 遵守Robots协议:尊重网站的robots.txt文件
  2. 控制请求频率:避免对目标服务器造成过大压力
  3. 保护个人隐私:不得爬取和传播个人敏感信息
  4. 合法使用数据:确保数据使用符合相关法律法规
  5. 尊重版权:不得侵犯他人的知识产权

6.4 项目扩展建议

本项目可进一步扩展的功能:

  1. 分布式部署:支持多设备、多节点协同工作
  2. 实时监控:提供可视化的爬取状态监控
  3. 智能调度:根据网站响应情况自动调整策略
  4. 数据可视化:提供数据分析和可视化报表
  5. API接口:提供标准化的数据访问接口

通过本项目的实施,我们建立了一个完整的App数据爬取解决方案,涵盖了从环境搭建到数据存储的全流程,并考虑了性能优化和合规性要求。这个方案可以作为类似项目的参考和基础。

6. 数据中心建设

6.1 Redis 任务队列构建

使用 Redis 构建高效的 App 爬虫任务队列是实现大规模并发抓取的关键基础设施。Redis 的高性能和丰富的数据结构使其成为任务队列的理想选择。

6.1.1 Redis 任务队列架构设计

import redis
import json
import time
import uuid
from typing import Dict, List, Optional, Any
from enum import Enum
import asyncio
import aioredis
import pickle

class TaskStatus(Enum):
    """任务状态枚举"""
    PENDING = "pending"      # 待处理
    PROCESSING = "processing"  # 处理中
    COMPLETED = "completed"   # 已完成
    FAILED = "failed"        # 失败
    RETRYING = "retrying"     # 重试中

class TaskPriority(Enum):
    """任务优先级枚举"""
    LOW = 1
    NORMAL = 2
    HIGH = 3
    URGENT = 4

class RedisTaskQueue:
    """Redis任务队列管理器"""
    
    def __init__(self, host='localhost', port=6379, db=0, password=None):
        self.redis_client = redis.Redis(
            host=host, 
            port=port, 
            db=db, 
            password=password,
            decode_responses=False  # 保持二进制模式以支持pickle
        )
        self.task_queue_key = "app_crawler:task_queue"
        self.task_status_key = "app_crawler:task_status:"
        self.task_results_key = "app_crawler:task_results:"
        self.task_retry_key = "app_crawler:task_retry:"
        self.active_workers_key = "app_crawler:active_workers"
        self.stats_key = "app_crawler:stats"
        self.init_queues()
    
    def init_queues(self):
        """初始化队列"""
        # 创建优先级队列
        self.priority_queues = {
            TaskPriority.URGENT.value: f"{self.task_queue_key}:urgent",
            TaskPriority.HIGH.value: f"{self.task_queue_key}:high", 
            TaskPriority.NORMAL.value: f"{self.task_queue_key}:normal",
            TaskPriority.LOW.value: f"{self.task_queue_key}:low"
        }
        print("✓ Redis任务队列初始化完成")
    
    def create_task(self, task_type: str, payload: Dict, priority: TaskPriority = TaskPriority.NORMAL, 
                   retry_count: int = 3, timeout: int = 300) -> str:
        """创建新任务"""
        task_id = str(uuid.uuid4())
        task_data = {
            'id': task_id,
            'type': task_type,
            'payload': payload,
            'priority': priority.value,
            'created_at': time.time(),
            'retry_count': retry_count,
            'max_retries': retry_count,
            'timeout': timeout,
            'status': TaskStatus.PENDING.value
        }
        
        # 存储任务详情
        self.redis_client.setex(
            f"{self.task_status_key}{task_id}", 
            timeout * 2,  # 状态保留时间更长
            pickle.dumps(task_data)
        )
        
        # 添加到相应优先级队列
        queue_key = self.priority_queues[priority.value]
        self.redis_client.lpush(queue_key, pickle.dumps(task_data))
        print(f"✓ 任务 {task_id} 已创建并加入队列 (优先级: {priority.name})")
        return task_id
    
    def get_next_task(self, worker_id: str) -> Optional[Dict]:
        """获取下一个任务(按优先级)"""
        # 按优先级顺序检查队列
        for priority_value in sorted(self.priority_queues.keys(), reverse=True):
            queue_key = self.priority_queues[priority_value]
            task_data = self.redis_client.brpoplpush(queue_key, f"{queue_key}:processing", timeout=1)
            if task_data:
                task = pickle.loads(task_data)
                # 更新任务状态为处理中
                task['status'] = TaskStatus.PROCESSING.value
                task['worker_id'] = worker_id
                task['started_at'] = time.time()
                self.redis_client.setex(
                    f"{self.task_status_key}{task['id']}", 
                    task['timeout'] * 2, 
                    pickle.dumps(task)
                )
                # 记录活跃工作节点
                self.redis_client.sadd(self.active_workers_key, worker_id)
                self.redis_client.expire(self.active_workers_key, 3600)  # 1小时过期
                print(f"✓ 工作节点 {worker_id} 获取任务 {task['id']}")
                return task
        return None
    
    def complete_task(self, task_id: str, result: Any = None, worker_id: str = None):
        """完成任务"""
        # 更新任务状态
        status_key = f"{self.task_status_key}{task_id}"
        task_bytes = self.redis_client.get(status_key)
        if task_bytes:
            task = pickle.loads(task_bytes)
            task['status'] = TaskStatus.COMPLETED.value
            task['completed_at'] = time.time()
            task['result'] = result
            if worker_id:
                task['completed_by'] = worker_id
            self.redis_client.setex(status_key, 86400, pickle.dumps(task))  # 保留1天
        
        # 存储结果
        if result is not None:
            self.redis_client.setex(
                f"{self.task_results_key}{task_id}", 
                86400,  # 结果保留1天
                pickle.dumps(result)
            )
        print(f"✓ 任务 {task_id} 已完成")
    
    def fail_task(self, task_id: str, error: str = None, worker_id: str = None):
        """标记任务失败"""
        status_key = f"{self.task_status_key}{task_id}"
        task_bytes = self.redis_client.get(status_key)
        if task_bytes:
            task = pickle.loads(task_bytes)
            task['status'] = TaskStatus.FAILED.value
            task['failed_at'] = time.time()
            task['error'] = error
            if worker_id:
                task['failed_by'] = worker_id
            
            # 检查是否需要重试
            if task['retry_count'] > 0:
                task['status'] = TaskStatus.RETRYING.value
                task['retry_count'] -= 1
                print(f"! 任务 {task_id} 失败,剩余重试次数: {task['retry_count']}")
                # 将任务放回队列进行重试
                retry_queue = self.priority_queues[task['priority']]
                self.redis_client.lpush(retry_queue, pickle.dumps(task))
            else:
                self.redis_client.setex(status_key, 86400, pickle.dumps(task))
                print(f"✗ 任务 {task_id} 最终失败")
    
    def get_task_status(self, task_id: str) -> Optional[Dict]:
        """获取任务状态"""
        task_bytes = self.redis_client.get(f"{self.task_status_key}{task_id}")
        if task_bytes:
            return pickle.loads(task_bytes)
        return None
    
    def get_task_result(self, task_id: str) -> Any:
        """获取任务结果"""
        result_bytes = self.redis_client.get(f"{self.task_results_key}{task_id}")
        if result_bytes:
            return pickle.loads(result_bytes)
        return None
    
    def cleanup_stale_tasks(self):
        """清理过期任务"""
        # 这里可以实现清理逻辑,检查长时间处于PROCESSING状态的任务
        pass
    
    def get_stats(self) -> Dict:
        """获取队列统计信息"""
        stats = {}
        for priority, queue_key in self.priority_queues.items():
            stats[f"queue_{TaskPriority(priority).name.lower()}"] = self.redis_client.llen(queue_key)
            stats[f"processing_{TaskPriority(priority).name.lower()}"] = self.redis_client.llen(f"{queue_key}:processing")
        stats['active_workers'] = self.redis_client.scard(self.active_workers_key)
        return stats

class AppCrawlerWorker:
    """App爬虫工作节点"""
    
    def __init__(self, worker_id: str, redis_host: str = 'localhost', redis_port: int = 6379):
        self.worker_id = worker_id
        self.task_queue = RedisTaskQueue(redis_host, redis_port)
        self.running = False
        self.device_manager = None  # 设备管理器实例
    
    def register_device(self, device_manager):
        """注册设备管理器"""
        self.device_manager = device_manager
    
    async def process_task(self, task: Dict) -> Any:
        """处理具体任务"""
        task_type = task['type']
        payload = task['payload']
        print(f"处理任务: {task_type}, ID: {task['id']}")
        
        try:
            if task_type == 'product_list_crawl':
                return await self.crawl_product_list(payload)
            elif task_type == 'product_detail_crawl':
                return await self.crawl_product_detail(payload)
            elif task_type == 'review_crawl':
                return await self.crawl_reviews(payload)
            elif task_type == 'app_analysis':
                return await self.analyze_app(payload)
            elif task_type == 'security_bypass':
                return await self.handle_security_bypass(payload)
            else:
                raise ValueError(f"未知任务类型: {task_type}")
        except Exception as e:
            print(f"任务处理失败: {e}")
            raise e
    
    async def crawl_product_list(self, payload: Dict) -> List[Dict]:
        """爬取商品列表"""
        # 实现商品列表爬取逻辑
        # 这里只是示例,实际需要连接真实设备
        import random
        import time
        time.sleep(random.uniform(1, 3))  # 模拟耗时操作
        
        # 模拟爬取结果
        products = []
        for i in range(random.randint(10, 20)):
            products.append({
                'id': f"prod_{payload.get('category', 'default')}_{i}",
                'title': f"Product {i} in {payload.get('category', 'default')}",
                'price': f{random.randint(100, 1000)}.00",
                'sales': f"{random.randint(100, 10000)}+",
                'rating': round(random.uniform(3.0, 5.0), 1)
            })
        print(f"✓ 完成商品列表爬取,获取 {len(products)} 个商品")
        return products
    
    async def crawl_product_detail(self, payload: Dict) -> Dict:
        """爬取商品详情"""
        import random
        import time
        time.sleep(random.uniform(2, 4))  # 模拟耗时操作
        
        # 模拟商品详情
        detail = {
            'id': payload['product_id'],
            'title': f"Detailed Product {payload['product_id']}",
            'description': "This is a detailed description of the product...",
            'specifications': {
                'color': random.choice(['Red', 'Blue', 'Black', 'White']),
                'size': random.choice(['S', 'M', 'L', 'XL']),
                'weight': f"{random.uniform(0.1, 5.0):.2f}kg"
            },
            'images': [f"http://example.com/image/{payload['product_id']}_{i}.jpg" for i in range(1, 4)],
            'seller_info': {'name': 'Official Store', 'rating': 4.8},
            'stock': random.randint(0, 1000)
        }
        print(f"✓ 完成商品详情爬取: {payload['product_id']}")
        return detail
    
    async def crawl_reviews(self, payload: Dict) -> List[Dict]:
        """爬取评论"""
        import random
        import time
        time.sleep(random.uniform(1, 2))  # 模拟耗时操作
        
        reviews = []
        for i in range(random.randint(5, 15)):
            reviews.append({
                'id': f"rev_{payload['product_id']}_{i}",
                'user_id': f"user_{random.randint(10000, 99999)}",
                'username': f"User{random.randint(100, 999)}",
                'rating': random.randint(1, 5),
                'content': f"This is review #{i} for product {payload['product_id']}",
                'date': f"2024-{random.randint(1, 12):02d}-{random.randint(1, 28):02d}"
            })
        print(f"✓ 完成评论爬取,获取 {len(reviews)} 条评论")
        return reviews
    
    async def analyze_app(self, payload: Dict) -> Dict:
        """分析App结构"""
        import time
        time.sleep(1)  # 模拟分析耗时
        
        analysis = {
            'package_name': payload['package_name'],
            'activities': ['MainActivity', 'SplashActivity', 'LoginActivity'],
            'permissions': ['INTERNET', 'ACCESS_NETWORK_STATE', 'WRITE_EXTERNAL_STORAGE'],
            'libraries': ['okhttp', 'gson', 'retrofit'],
            'security_features': ['SSL Pinning', 'Root Detection', 'Emulator Detection']
        }
        print(f"✓ 完成App分析: {payload['package_name']}")
        return analysis
    
    async def handle_security_bypass(self, payload: Dict) -> Dict:
        """处理安全绕过"""
        import time
        time.sleep(2)  # 模拟绕过耗时
        
        result = {
            'success': True,
            'bypassed_features': payload.get('features', []),
            'methods_used': ['Frida Hook', 'Certificate Bypass', 'Traffic Interception'],
            'timestamp': time.time()
        }
        print(f"✓ 完成安全绕过: {payload.get('features')}")
        return result
    
    async def run(self):
        """运行工作节点"""
        self.running = True
        print(f"启动工作节点: {self.worker_id}")
        print(f"连接到Redis: {self.task_queue.redis_client.connection_pool.connection_kwargs}")
        
        while self.running:
            try:
                # 获取任务
                task = self.task_queue.get_next_task(self.worker_id)
                if task:
                    print(f"获取任务: {task['id']}, 类型: {task['type']}")
                    try:
                        # 处理任务
                        result = await self.process_task(task)
                        # 标记任务完成
                        self.task_queue.complete_task(task['id'], result, self.worker_id)
                        print(f"任务完成: {task['id']}")
                    except Exception as e:
                        print(f"任务失败: {task['id']}, 错误: {e}")
                        self.task_queue.fail_task(task['id'], str(e), self.worker_id)
                else:
                    # 没有任务时短暂休眠
                    await asyncio.sleep(0.1)
            except KeyboardInterrupt:
                print("收到终止信号")
                self.running = False
            except Exception as e:
                print(f"工作节点错误: {e}")
                await asyncio.sleep(1)  # 错误后稍作休眠
        
        print(f"工作节点 {self.worker_id} 已停止")

class RedisTaskQueueManager:
    """Redis任务队列管理器 - 用于协调多个工作节点"""
    
    def __init__(self, redis_host: str = 'localhost', redis_port: int = 6379):
        self.task_queue = RedisTaskQueue(redis_host, redis_port)
        self.workers = []
    
    def add_worker(self, worker: AppCrawlerWorker):
        """添加工作节点"""
        self.workers.append(worker)
        print(f"添加工作节点: {worker.worker_id}")
    
    async def start_all_workers(self):
        """启动所有工作节点"""
        tasks = []
        for worker in self.workers:
            tasks.append(worker.run())
        if tasks:
            await asyncio.gather(*tasks)
    
    def submit_batch_tasks(self, tasks_config: List[Dict]) -> List[str]:
        """批量提交任务"""
        task_ids = []
        for config in tasks_config:
            task_id = self.task_queue.create_task(
                task_type=config['type'],
                payload=config['payload'],
                priority=config.get('priority', TaskPriority.NORMAL),
                retry_count=config.get('retry_count', 3),
                timeout=config.get('timeout', 300)
            )
            task_ids.append(task_id)
        print(f"批量提交 {len(task_ids)} 个任务")
        return task_ids
    
    def get_overall_stats(self) -> Dict:
        """获取总体统计信息"""
        stats = self.task_queue.get_stats()
        active_tasks = 0
        processing_tasks = 0
        
        # 可以进一步查询活跃和处理中的任务数
        for worker in self.workers:
            # 检查工作节点状态
            pass
            
        stats['total_workers'] = len(self.workers)
        return stats

# 使用示例
async def example_redis_queue_usage():
    """Redis队列使用示例"""
    print("=== Redis任务队列使用示例 ===\n")
    
    # 创建队列管理器
    manager = RedisTaskQueueManager()
    
    # 创建多个工作节点
    for i in range(3):  # 创建3个工作节点
        worker = AppCrawlerWorker(f"worker_{i+1}")
        manager.add_worker(worker)
    
    # 提交一批任务
    batch_tasks = [
        {
            'type': 'product_list_crawl',
            'payload': {'category': 'electronics', 'page': 1},
            'priority': TaskPriority.HIGH
        },
        {
            'type': 'product_detail_crawl', 
            'payload': {'product_id': 'prod_123'},
            'priority': TaskPriority.NORMAL
        },
        {
            'type': 'review_crawl',
            'payload': {'product_id': 'prod_123', 'page_count': 3},
            'priority': TaskPriority.NORMAL
        },
        {
            'type': 'app_analysis',
            'payload': {'package_name': 'com.example.app'},
            'priority': TaskPriority.URGENT
        }
    ]
    
    task_ids = manager.submit_batch_tasks(batch_tasks)
    print(f"提交的任务ID: {task_ids}\n")
    
    # 显示初始统计
    stats = manager.get_overall_stats()
    print(f"初始统计: {stats}\n")
    
    # 启动工作节点处理任务
    print("启动工作节点处理任务...")
    # 注意:在实际使用中,这里会持续运行直到被中断
    # await manager.start_all_workers()

if __name__ == "__main__":
    # 运行示例(注释掉以避免实际连接Redis)
    # asyncio.run(example_redis_queue_usage())
    pass

6.1.2 Redis 高可用配置

为了确保任务队列的高可用性,我们可以配置 Redis 主从复制或使用 Redis Cluster:

import redis.sentinel
import redis.cluster
from typing import Union, List

class HighAvailabilityRedisManager:
    """高可用Redis管理器"""
    
    def __init__(self, mode: str = 'standalone'):
        self.mode = mode
        self.client = None
        self.setup_connection()
    
    def setup_connection(self):
        """根据模式设置连接"""
        if self.mode == 'sentinel':
            # Redis Sentinel 模式
            sentinel_hosts = [("localhost", 26379), ("localhost", 26380), ("localhost", 26381)]
            sentinel = redis.sentinel.Sentinel(sentinel_hosts)
            self.client = sentinel.master_for('mymaster', socket_timeout=0.1)
        elif self.mode == 'cluster':
            # Redis Cluster 模式
            cluster_nodes = [{"host": "localhost", "port": 7000},
                           {"host": "localhost", "port": 7001},
                           {"host": "localhost", "port": 7002}]
            self.client = redis.cluster.RedisCluster(startup_nodes=cluster_nodes, decode_responses=False)
        else:
            # 单机模式
            self.client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=False)
    
    def get_client(self) -> Union[redis.Redis, redis.cluster.RedisCluster]:
        """获取Redis客户端"""
        return self.client

class RedisQueueWithHA(RedisTaskQueue):
    """支持高可用的Redis任务队列"""
    
    def __init__(self, ha_mode: str = 'standalone'):
        self.ha_manager = HighAvailabilityRedisManager(ha_mode)
        self.redis_client = self.ha_manager.get_client()
        self.task_queue_key = "app_crawler:task_queue"
        self.task_status_key = "app_crawler:task_status:"
        self.task_results_key = "app_crawler:task_results:"
        self.task_retry_key = "app_crawler:task_retry:"
        self.active_workers_key = "app_crawler:active_workers"
        self.stats_key = "app_crawler:stats"
        self.init_queues()

6.2 代理池系统

接入代理池是实现大规模并发抓取的重要环节,可以帮助我们避免IP封禁并提高抓取效率。

6.2.1 代理池架构设计

import random
import time
import requests
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import threading
import queue
import asyncio
import aiohttp

class ProxyProtocol(Enum):
    """代理协议类型"""
    HTTP = "http"
    HTTPS = "https"
    SOCKS4 = "socks4"
    SOCKS5 = "socks5"

class ProxyAnonymity(Enum):
    """代理匿名级别"""
    TRANSPARENT = "transparent"  # 透明代理
    ANONYMOUS = "anonymous"     # 匿名代理
    ELITE = "elite"            # 高匿代理

@dataclass
class ProxyInfo:
    """代理信息"""
    host: str
    port: int
    protocol: ProxyProtocol
    anonymity: ProxyAnonymity
    region: str = ""
    response_time: float = 0.0
    last_checked: float = 0.0
    score: float = 0.0  # 代理质量评分
    is_alive: bool = True

class ProxyPool:
    """代理池管理器"""
    
    def __init__(self, check_url: str = "http://httpbin.org/ip", timeout: int = 5):
        self.proxies: List[ProxyInfo] = []
        self.check_url = check_url
        self.timeout = timeout
        self.lock = threading.RLock()
        self.stats = {
            'total_checked': 0,
            'alive_count': 0,
            'dead_count': 0,
            'usage_count': {}
        }
        self._initialize_default_proxies()
    
    def _initialize_default_proxies(self):
        """初始化默认代理(实际使用时应从代理供应商获取)"""
        # 注意:以下仅为示例,实际应用中应从代理服务提供商获取真实代理
        print("⚠️  注意:以下为示例代理,实际使用时需要替换为真实有效的代理")
        pass
    
    def add_proxy(self, proxy_info: ProxyInfo):
        """添加代理到池中"""
        with self.lock:
            self.proxies.append(proxy_info)
            self.stats['alive_count'] += 1
            print(f"添加代理: {proxy_info.host}:{proxy_info.port} ({proxy_info.protocol.value})")
    
    def add_proxy_from_string(self, proxy_str: str, anonymity: ProxyAnonymity = ProxyAnonymity.ANONYMOUS):
        """从字符串添加代理 (格式: protocol://host:port)"""
        import re
        pattern = r'^(https?|socks[45])://([^:]+):(\d+)$'
        match = re.match(pattern, proxy_str.lower())
        if match:
            protocol, host, port = match.groups()
            proxy_info = ProxyInfo(
                host=host,
                port=int(port),
                protocol=ProxyProtocol(protocol.lower()),
                anonymity=anonymity
            )
            self.add_proxy(proxy_info)
            return True
        return False
    
    def check_proxy(self, proxy_info: ProxyInfo) -> bool:
        """检查代理是否可用"""
        try:
            start_time = time.time()
            proxies = {
                'http': f"{proxy_info.protocol.value}://{proxy_info.host}:{proxy_info.port}",
                'https': f"{proxy_info.protocol.value}://{proxy_info.host}:{proxy_info.port}"
            }
            
            response = requests.get(self.check_url, proxies=proxies, timeout=self.timeout)
            response_time = time.time() - start_time
            
            if response.status_code == 200:
                # 检查是否泄露真实IP(判断匿名级别)
                import json
                ip_data = response.json()
                # 这里可以添加更详细的匿名性检测逻辑
                proxy_info.response_time = response_time
                proxy_info.last_checked = time.time()
                proxy_info.is_alive = True
                proxy_info.score = self.calculate_proxy_score(proxy_info)
                self.stats['total_checked'] += 1
                return True
            else:
                proxy_info.is_alive = False
                return False
        except Exception as e:
            proxy_info.is_alive = False
            proxy_info.response_time = float('inf')
            print(f"代理检查失败 {proxy_info.host}:{proxy_info.port} - {e}")
            return False
    
    def calculate_proxy_score(self, proxy_info: ProxyInfo) -> float:
        """计算代理分数(响应时间越短,分数越高)"""
        if not proxy_info.is_alive:
            return 0.0
        # 响应时间越短分数越高,最高10分
        base_score = max(0, 10 - proxy_info.response_time)
        # 匿名级别加分
        anonymity_bonus = {'elite': 3, 'anonymous': 1, 'transparent': 0}[proxy_info.anonymity.value]
        return min(10, base_score + anonymity_bonus)
    
    def check_all_proxies(self, max_workers: int = 10):
        """批量检查所有代理"""
        from concurrent.futures import ThreadPoolExecutor, as_completed
        alive_count = 0
        dead_count = 0
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            # 提交所有检查任务
            future_to_proxy = {executor.submit(self.check_proxy, proxy): proxy for proxy in self.proxies}
            
            for future in as_completed(future_to_proxy):
                proxy = future_to_proxy[future]
                try:
                    is_alive = future.result()
                    if is_alive:
                        alive_count += 1
                    else:
                        dead_count += 1
                except Exception as e:
                    print(f"检查代理时出错 {proxy.host}:{proxy.port} - {e}")
                    dead_count += 1
        
        self.stats['alive_count'] = alive_count
        self.stats['dead_count'] = dead_count
        print(f"代理检查完成: {alive_count} 个存活, {dead_count} 个失效")
    
    def get_best_proxy(self, protocol: ProxyProtocol = None, anonymity: ProxyAnonymity = None) -> Optional[ProxyInfo]:
        """获取最佳代理(按分数排序)"""
        with self.lock:
            candidates = [p for p in self.proxies if p.is_alive]
            if protocol:
                candidates = [p for p in candidates if p.protocol == protocol]
            if anonymity:
                candidates = [p for p in candidates if p.anonymity == anonymity]
            
            if candidates:
                # 按分数降序排列,返回最佳的
                best_proxy = max(candidates, key=lambda p: p.score)
                self._record_usage(best_proxy)
                return best_proxy
            return None
    
    def get_random_proxy(self, protocol: ProxyProtocol = None) -> Optional[ProxyInfo]:
        """随机获取一个可用代理"""
        with self.lock:
            candidates = [p for p in self.proxies if p.is_alive]
            if protocol:
                candidates = [p for p in candidates if p.protocol == protocol]
            
            if candidates:
                proxy = random.choice(candidates)
                self._record_usage(proxy)
                return proxy
            return None
    
    def get_proxy_with_rotation(self) -> Optional[ProxyInfo]:
        """轮询获取代理(避免单个代理过载)"""
        with self.lock:
            alive_proxies = [p for p in self.proxies if p.is_alive]
            if not alive_proxies:
                return None
            
            # 按使用次数升序排列,选择使用最少的
            least_used = min(alive_proxies, key=lambda p: self.stats['usage_count'].get(f"{p.host}:{p.port}", 0))
            self._record_usage(least_used)
            return least_used
    
    def _record_usage(self, proxy: ProxyInfo):
        """记录代理使用情况"""
        key = f"{proxy.host}:{proxy.port}"
        self.stats['usage_count'][key] = self.stats['usage_count'].get(key, 0) + 1
    
    def remove_dead_proxies(self):
        """移除失效代理"""
        with self.lock:
            original_count = len(self.proxies)
            self.proxies = [p for p in self.proxies if p.is_alive]
            removed_count = original_count - len(self.proxies)
            self.stats['alive_count'] = len(self.proxies)
            if removed_count > 0:
                print(f"移除 {removed_count} 个失效代理")
    
    def get_stats(self) -> Dict:
        """获取代理池统计信息"""
        with self.lock:
            return {
                'total_proxies': len(self.proxies),
                'alive_proxies': self.stats['alive_count'],
                'dead_proxies': self.stats['dead_count'],
                'total_checked': self.stats['total_checked'],
                'avg_response_time': self._get_avg_response_time(),
                'top_anonymous_proxies': self._get_top_anonymous_proxies()
            }
    
    def _get_avg_response_time(self) -> float:
        """获取平均响应时间"""
        alive_proxies = [p for p in self.proxies if p.is_alive]
        if not alive_proxies:
            return float('inf')
        return sum(p.response_time for p in alive_proxies) / len(alive_proxies)
    
    def _get_top_anonymous_proxies(self) -> List[ProxyInfo]:
        """获取高匿代理列表"""
        elite_proxies = [p for p in self.proxies if p.is_alive and p.anonymity == ProxyAnonymity.ELITE]
        return sorted(elite_proxies, key=lambda p: p.score, reverse=True)[:5]

class AsyncProxyPool(ProxyPool):
    """异步代理池"""
    
    async def async_check_proxy(self, proxy_info: ProxyInfo) -> bool:
        """异步检查代理"""
        try:
            start_time = time.time()
            connector = aiohttp.TCPConnector(ssl=False)
            timeout = aiohttp.ClientTimeout(total=self.timeout)
            proxy_url = f"{proxy_info.protocol.value}://{proxy_info.host}:{proxy_info.port}"
            
            async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
                async with session.get(self.check_url, proxy=proxy_url) as response:
                    response_time = time.time() - start_time
                    if response.status == 200:
                        proxy_info.response_time = response_time
                        proxy_info.last_checked = time.time()
                        proxy_info.is_alive = True
                        proxy_info.score = self.calculate_proxy_score(proxy_info)
                        self.stats['total_checked'] += 1
                        return True
                    else:
                        proxy_info.is_alive = False
                        return False
        except Exception as e:
            proxy_info.is_alive = False
            proxy_info.response_time = float('inf')
            print(f"异步代理检查失败 {proxy_info.host}:{proxy_info.port} - {e}")
            return False
    
    async def async_check_all_proxies(self, max_concurrent: int = 20):
        """异步批量检查代理"""
        semaphore = asyncio.Semaphore(max_concurrent)
        tasks = []
        
        async def check_with_semaphore(proxy_info):
            async with semaphore:
                return await self.async_check_proxy(proxy_info)
        
        for proxy in self.proxies:
            task = asyncio.create_task(check_with_semaphore(proxy))
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        alive_count = sum(1 for r in results if r is True)
        dead_count = len(results) - alive_count
        
        self.stats['alive_count'] = alive_count
        self.stats['dead_count'] = dead_count
        print(f"异步代理检查完成: {alive_count} 个存活, {dead_count} 个失效")

class ProxyPoolIntegration:
    """代理池集成类 - 与爬虫系统集成"""
    
    def __init__(self, proxy_pool: ProxyPool):
        self.proxy_pool = proxy_pool
        self.failed_proxies = set()  # 记录本次会话中失败的代理
    
    def get_proxy_for_request(self, require_anonymous: bool = False) -> Optional[str]:
        """为请求获取代理"""
        anonymity = ProxyAnonymity.ELITE if require_anonymous else None
        proxy = self.proxy_pool.get_best_proxy(anonymity=anonymity)
        if proxy:
            proxy_url = f"{proxy.protocol.value}://{proxy.host}:{proxy.port}"
            return proxy_url
        return None
    
    def mark_proxy_failed(self, proxy_url: str, error_type: str = "connection_error"):
        """标记代理失败"""
        import re
        pattern = r'^(https?|socks[45])://([^:]+):(\d+)$'
        match = re.match(pattern, proxy_url)
        if match:
            protocol, host, port = match.groups()
            # 在实际使用中,可以考虑暂时禁用该代理一段时间
            self.failed_proxies.add(f"{host}:{port}")
            print(f"标记代理失败: {proxy_url} ({error_type})")
    
    async def make_request_with_proxy(self, url: str, require_anonymous: bool = False, **kwargs) -> Optional[aiohttp.ClientResponse]:
        """使用代理发起异步请求"""
        proxy_url = self.get_proxy_for_request(require_anonymous)
        if not proxy_url:
            print("没有可用代理")
            return None
        
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, proxy=proxy_url, **kwargs) as response:
                    # 如果请求成功,更新代理使用统计
                    proxy_host_port = ':'.join(proxy_url.split(':')[1:]).lstrip('//')
                    self.proxy_pool.stats['usage_count'][proxy_host_port] = self.proxy_pool.stats['usage_count'].get(proxy_host_port, 0) + 1
                    return response
        except Exception as e:
            # 请求失败,标记代理有问题
            self.mark_proxy_failed(proxy_url, str(e))
            return None

# 使用示例
def example_proxy_pool_usage():
    """代理池使用示例"""
    print("=== 代理池使用示例 ===\n")
    
    # 创建代理池
    proxy_pool = ProxyPool()
    
    # 添加一些示例代理(实际使用时应从代理供应商获取)
    sample_proxies = [
        "http://127.0.0.1:8080",
        "https://127.0.0.1:8443", 
        "socks5://127.0.0.1:1080"
    ]
    
    for proxy_str in sample_proxies:
        if proxy_pool.add_proxy_from_string(proxy_str):
            print(f"✓ 添加代理: {proxy_str}")
        else:
            print(f"✗ 无法解析代理: {proxy_str}")
    
    # 检查代理可用性
    print("\n检查代理可用性...")
    proxy_pool.check_all_proxies(max_workers=5)
    
    # 获取统计信息
    stats = proxy_pool.get_stats()
    print(f"\n代理池统计: {stats}")
    
    # 获取最佳代理
    best_proxy = proxy_pool.get_best_proxy()
    if best_proxy:
        print(f"\n最佳代理: {best_proxy.host}:{best_proxy.port} "
              f"(响应时间: {best_proxy.response_time:.2f}s, "
              f"分数: {best_proxy.score:.1f})")
    
    # 获取随机代理
    random_proxy = proxy_pool.get_random_proxy()
    if random_proxy:
        print(f"随机代理: {random_proxy.host}:{random_proxy.port}")
    
    # 代理池集成示例
    integration = ProxyPoolIntegration(proxy_pool)
    proxy_for_request = integration.get_proxy_for_request(require_anonymous=True)
    print(f"\n为匿名请求选择的代理: {proxy_for_request}")

if __name__ == "__main__":
    example_proxy_pool_usage()

6.2.2 代理池与爬虫系统集成

import asyncio
import aiohttp
from typing import List, Dict, Optional
from tenacity import retry, stop_after_attempt, wait_exponential

class IntegratedCrawlerSystem:
    """集成代理池的爬虫系统"""
    
    def __init__(self, proxy_pool: ProxyPool, max_concurrent: int = 10):
        self.proxy_pool = proxy_pool
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
        self.setup_session()
    
    def setup_session(self):
        """设置HTTP会话"""
        connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
        timeout = aiohttp.ClientTimeout(total=30, connect=10)
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            headers={
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
        )
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    async def fetch_with_proxy(self, url: str, proxy_required: bool = True, **kwargs) -> Optional[aiohttp.ClientResponse]:
        """使用代理获取页面,支持重试机制"""
        async with self.semaphore:
            proxy_url = None
            if proxy_required:
                proxy_info = self.proxy_pool.get_proxy_with_rotation()
                if proxy_info:
                    proxy_url = f"{proxy_info.protocol.value}://{proxy_info.host}:{proxy_info.port}"
                    print(f"使用代理: {proxy_url} 访问 {url}")
                else:
                    print(f"警告: 无可用代理,直接访问 {url}")
                    # 如果强制需要代理但无可用代理,则抛出异常
                    if proxy_required:
                        raise Exception("无可用代理")
            
            try:
                async with self.session.get(url, proxy=proxy_url, **kwargs) as response:
                    # 更新代理使用统计
                    if proxy_url:
                        proxy_host_port = ':'.join(proxy_url.split(':')[1:]).lstrip('//')
                        self.proxy_pool.stats['usage_count'][proxy_host_port] = self.proxy_pool.stats['usage_count'].get(proxy_host_port, 0) + 1
                    return response
            except Exception as e:
                # 如果是代理相关错误,标记代理失败
                if proxy_url:
                    proxy_integration = ProxyPoolIntegration(self.proxy_pool)
                    proxy_integration.mark_proxy_failed(proxy_url, str(e))
                raise e  # 重新抛出异常以触发重试
    
    async def crawl_multiple_urls(self, urls: List[str], proxy_required: bool = True) -> List[Optional[Dict]]:
        """批量爬取URL"""
        tasks = [self.fetch_url_and_process(url, proxy_required) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
    
    async def fetch_url_and_process(self, url: str, proxy_required: bool = True) -> Optional[Dict]:
        """获取URL并处理响应"""
        try:
            response = await self.fetch_with_proxy(url, proxy_required)
            if response and response.status == 200:
                content = await response.text()
                return {
                    'url': url,
                    'status': response.status,
                    'content_length': len(content),
                    'content': content[:500] + "..." if len(content) > 500 else content  # 截取前500字符
                }
            else:
                print(f"请求失败: {url}, 状态码: {response.status if response else 'N/A'}")
                return {'url': url, 'status': response.status if response else 0, 'error': 'Request failed'}
        except Exception as e:
            print(f"爬取失败: {url}, 错误: {e}")
            return {'url': url, 'error': str(e)}
    
    async def periodic_proxy_check(self, interval: int = 300):  # 5分钟
        """定期检查代理可用性"""
        while True:
            try:
                print("开始定期代理检查...")
                await self.proxy_pool.async_check_all_proxies(max_concurrent=10)
                self.proxy_pool.remove_dead_proxies()
                stats = self.proxy_pool.get_stats()
                print(f"代理池状态: {stats}")
            except Exception as e:
                print(f"代理检查出错: {e}")
            await asyncio.sleep(interval)

class MobileAppCrawlerWithProxies(IntegratedCrawlerSystem):
    """集成代理池的移动App爬虫系统"""
    
    def __init__(self, proxy_pool: ProxyPool, redis_queue: RedisTaskQueue, max_concurrent: int = 5):
        super().__init__(proxy_pool, max_concurrent)
        self.redis_queue = redis_queue
        self.app_detection_headers = {
            # 移动App常用的头部
            'Accept': '*/*',
            'Accept-Encoding': 'gzip, deflate, br',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Connection': 'keep-alive',
        }
    
    async def crawl_app_api(self, api_endpoint: str, app_headers: Dict = None, proxy_required: bool = True) -> Optional[Dict]:
        """爬取App API接口"""
        headers = self.app_detection_headers.copy()
        if app_headers:
            headers.update(app_headers)
        
        try:
            response = await self.fetch_with_proxy(api_endpoint, proxy_required, headers=headers)
            if response and response.status == 200:
                json_data = await response.json()
                return {
                    'endpoint': api_endpoint,
                    'status': response.status,
                    'data': json_data,
                    'headers': dict(response.headers)
                }
            else:
                return {'endpoint': api_endpoint, 'status': response.status if response else 0, 'error': 'Request failed'}
        except Exception as e:
            return {'endpoint': api_endpoint, 'error': str(e)}
    
    async def simulate_app_behavior(self, base_url: str, endpoints: List[str], proxy_required: bool = True) -> List[Dict]:
        """模拟App行为,批量请求多个接口"""
        tasks = []
        for endpoint in endpoints:
            full_url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}"
            task = self.crawl_app_api(full_url, proxy_required=proxy_required)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

# 使用示例
async def example_integrated_system():
    """集成系统使用示例"""
    print("=== 集成代理池的爬虫系统示例 ===\n")
    
    # 创建代理池
    proxy_pool = ProxyPool()
    # 添加一些示例代理
    for i in range(3):
        proxy_info = ProxyInfo(
            host=f"127.0.0.{i+1}",
            port=8080+i,
            protocol=ProxyProtocol.HTTP,
            anonymity=ProxyAnonymity.ELITE if i % 2 == 0 else ProxyAnonymity.ANONYMOUS,
            response_time=random.uniform(0.1, 1.0),
            score=random.uniform(5.0, 9.0)
        )
        proxy_pool.add_proxy(proxy_info)
    
    # 创建Redis任务队列
    redis_queue = RedisTaskQueue()
    
    # 创建集成爬虫系统
    crawler = MobileAppCrawlerWithProxies(proxy_pool, redis_queue, max_concurrent=3)
    
    # 示例API端点
    api_endpoints = [
        "api/products/list?page=1&category=electronics",
        "api/products/detail?id=12345",
        "api/reviews/product?id=12345&page=1",
        "api/user/profile",
        "api/search?q=mobile+phone"
    ]
    
    print("开始模拟App API请求...")
    results = await crawler.simulate_app_behavior("https://api.example.com", api_endpoints)
    
    success_count = sum(1 for r in results if isinstance(r, dict) and 'error' not in r)
    print(f"完成 {len(results)} 个API请求,成功 {success_count} 个")
    
    # 显示代理池统计
    stats = proxy_pool.get_stats()
    print(f"\n最终代理池统计: {stats}")

if __name__ == "__main__":
    asyncio.run(example_integrated_system())

6.3 手机群控设备自动化

手机群控是实现大规模App爬取的核心技术,通过统一的控制平台可以同时操作多台设备。

6.3.1 设备群控架构

import asyncio
import threading
import subprocess
import time
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass
import uiautomator2 as u2
import numpy as np
import cv2
from concurrent.futures import ThreadPoolExecutor, as_completed

@dataclass
class DeviceInfo:
    """设备信息"""
    serial: str
    model: str
    brand: str
    android_version: str
    api_level: int
    status: str
    ip_address: str = ""
    battery_level: int = 0

class DeviceController:
    """单个设备控制器"""
    
    def __init__(self, device_serial: str):
        self.serial = device_serial
        self.device = u2.connect(device_serial)
        self.is_connected = True
        self.device_info = self.get_device_info()
        print(f"✓ 连接设备: {self.serial} ({self.device_info.model})")
    
    def get_device_info(self) -> DeviceInfo:
        """获取设备信息"""
        try:
            # 获取设备型号
            model = self.device.info.get('productName', 'Unknown')
            brand = self.device.shell('getprop ro.product.brand').output.strip()
            android_version = self.device.shell('getprop ro.build.version.release').output.strip()
            api_level = int(self.device.shell('getprop ro.build.version.sdk').output.strip())
            status = "ONLINE" if self.device.alive else "OFFLINE"
            
            return DeviceInfo(
                serial=self.serial,
                model=model,
                brand=brand,
                android_version=android_version,
                api_level=api_level,
                status=status
            )
        except Exception as e:
            print(f"获取设备信息失败 {self.serial}: {e}")
            return DeviceInfo(serial=self.serial, model="Unknown", brand="Unknown", android_version="Unknown", api_level=0, status="ERROR")
    
    def install_app(self, apk_path: str) -> bool:
        """安装App"""
        try:
            self.device.app_install(apk_path)
            print(f"✓ 设备 {self.serial} 安装App成功")
            return True
        except Exception as e:
            print(f"✗ 设备 {self.serial} 安装App失败: {e}")
            return False
    
    def launch_app(self, package_name: str) -> bool:
        """启动App"""
        try:
            self.device.app_start(package_name)
            time.sleep(2)  # 等待App启动
            print(f"✓ 设备 {self.serial} 启动App {package_name} 成功")
            return True
        except Exception as e:
            print(f"✗ 设备 {self.serial} 启动App失败: {e}")
            return False
    
    def stop_app(self, package_name: str) -> bool:
        """停止App"""
        try:
            self.device.app_stop(package_name)
            print(f"✓ 设备 {self.serial} 停止App {package_name} 成功")
            return True
        except Exception as e:
            print(f"✗ 设备 {self.serial} 停止App失败: {e}")
            return False
    
    def take_screenshot(self, filename: str) -> bool:
        """截图"""
        try:
            self.device.screenshot(filename)
            print(f"✓ 设备 {self.serial} 截图保存至 {filename}")
            return True
        except Exception as e:
            print(f"✗ 设备 {self.serial} 截图失败: {e}")
            return False
    
    def click_element(self, selector: Dict) -> bool:
        """点击元素"""
        try:
            element = self.device(**selector)
            if element.exists:
                element.click()
                print(f"✓ 设备 {self.serial} 点击元素成功")
                return True
            else:
                print(f"✗ 设备 {self.serial} 未找到元素: {selector}")
                return False
        except Exception as e:
            print(f"✗ 设备 {self.serial} 点击元素失败: {e}")
            return False
    
    def input_text(self, selector: Dict, text: str) -> bool:
        """输入文本"""
        try:
            element = self.device(**selector)
            if element.exists:
                element.set_text(text)
                print(f"✓ 设备 {self.serial} 输入文本 '{text}' 成功")
                return True
            else:
                print(f"✗ 设备 {self.serial} 未找到输入框: {selector}")
                return False
        except Exception as e:
            print(f"✗ 设备 {self.serial} 输入文本失败: {e}")
            return False
    
    def swipe(self, fx: int, fy: int, tx: int, ty: int, duration: float = 0.5) -> bool:
        """滑动"""
        try:
            self.device.swipe(fx, fy, tx, ty, duration)
            print(f"✓ 设备 {self.serial} 滑动操作成功")
            return True
        except Exception as e:
            print(f"✗ 设备 {self.serial} 滑动操作失败: {e}")
            return False
    
    def get_current_app(self) -> str:
        """获取当前前台App"""
        try:
            info = self.device.app_current()
            return info.get('package', '')
        except Exception as e:
            print(f"获取当前App失败 {self.serial}: {e}")
            return ""
    
    def check_battery(self) -> int:
        """检查电量"""
        try:
            battery_info = self.device.shell('dumpsys battery').output
            for line in battery_info.split('\n'):
                if 'level' in line:
                    level = line.split(':')[-1].strip()
                    return int(level)
        except Exception as e:
            print(f"检查电量失败 {self.serial}: {e}")
        return -1

class DeviceGroupController:
    """设备群控控制器"""
    
    def __init__(self):
        self.devices: Dict[str, DeviceController] = {}
        self.executor = ThreadPoolExecutor(max_workers=20)
        self.lock = threading.Lock()
        self.discover_devices()
    
    def discover_devices(self):
        """发现连接的设备"""
        try:
            result = subprocess.run(['adb', 'devices'], capture_output=True, text=True, timeout=10)
            lines = result.stdout.strip().split('\n')[1:]  # 跳过标题行
            
            for line in lines:
                if line.strip() and 'device' in line and 'offline' not in line:
                    parts = line.strip().split('\t')
                    if len(parts) >= 2:
                        serial = parts[0]
                        if serial not in self.devices:
                            try:
                                controller = DeviceController(serial)
                                self.devices[serial] = controller
                                print(f"发现新设备: {serial}")
                            except Exception as e:
                                print(f"连接设备失败 {serial}: {e}")
        except Exception as e:
            print(f"发现设备失败: {e}")
    
    def add_device(self, serial: str) -> bool:
        """添加设备"""
        with self.lock:
            if serial not in self.devices:
                try:
                    controller = DeviceController(serial)
                    self.devices[serial] = controller
                    print(f"✓ 添加设备: {serial}")
                    return True
                except Exception as e:
                    print(f"✗ 添加设备失败 {serial}: {e}")
                    return False
            else:
                print(f"设备已存在: {serial}")
                return True
    
    def remove_device(self, serial: str) -> bool:
        """移除设备"""
        with self.lock:
            if serial in self.devices:
                del self.devices[serial]
                print(f"移除设备: {serial}")
                return True
            else:
                print(f"设备不存在: {serial}")
                return False
    
    def execute_on_all_devices(self, func: Callable, *args, **kwargs) -> Dict[str, any]:
        """在所有设备上执行函数"""
        results = {}
        futures = {}
        
        # 提交任务到线程池
        for serial, controller in self.devices.items():
            future = self.executor.submit(func, controller, *args, **kwargs)
            futures[future] = serial
        
        # 收集结果
        for future in as_completed(futures):
            serial = futures[future]
            try:
                result = future.result(timeout=30)  # 30秒超时
                results[serial] = result
            except Exception as e:
                results[serial] = f"Error: {e}"
                print(f"设备 {serial} 执行出错: {e}")
        
        return results
    
    def install_app_on_all(self, apk_path: str) -> Dict[str, bool]:
        """在所有设备上安装App"""
        def install_wrapper(controller, apk_path):
            return controller.install_app(apk_path)
        return self.execute_on_all_devices(install_wrapper, apk_path)
    
    def launch_app_on_all(self, package_name: str) -> Dict[str, bool]:
        """在所有设备上启动App"""
        def launch_wrapper(controller, package_name):
            return controller.launch_app(package_name)
        return self.execute_on_all_devices(launch_wrapper, package_name)
    
    def execute_shell_on_all(self, command: str) -> Dict[str, str]:
        """在所有设备上执行shell命令"""
        def shell_wrapper(controller, command):
            try:
                result = controller.device.shell(command)
                return result.output if result else ""
            except Exception as e:
                return f"Error: {e}"
        return self.execute_on_all_devices(shell_wrapper, command)
    
    def get_all_device_info(self) -> Dict[str, DeviceInfo]:
        """获取所有设备信息"""
        infos = {}
        for serial, controller in self.devices.items():
            infos[serial] = controller.device_info
        return infos
    
    def take_screenshots_all(self, directory: str = "./screenshots/") -> Dict[str, bool]:
        """对所有设备截图"""
        import os
        os.makedirs(directory, exist_ok=True)
        
        def screenshot_wrapper(controller, directory):
            filename = f"{directory}/{controller.serial}_{int(time.time())}.png"
            return controller.take_screenshot(filename)
        return self.execute_on_all_devices(screenshot_wrapper, directory)
    
    def sync_operation(self, operations: List[Dict]) -> Dict[str, List[any]]:
        """同步执行一系列操作"""
        results = {serial: [] for serial in self.devices.keys()}
        
        for op in operations:
            op_type = op['type']
            op_args = op.get('args', {})
            op_kwargs = op.get('kwargs', {})
            
            if op_type == 'click':
                def click_op(controller, *args, **kwargs):
                    return controller.click_element(*args, **kwargs)
                res = self.execute_on_all_devices(click_op, op_args.get('selector', {}))
            elif op_type == 'input':
                def input_op(controller, *args, **kwargs):
                    return controller.input_text(*args, **kwargs)
                res = self.execute_on_all_devices(input_op, op_args.get('selector', {}), op_args.get('text', ''))
            elif op_type == 'swipe':
                def swipe_op(controller, *args, **kwargs):
                    return controller.swipe(*args, **kwargs)
                res = self.execute_on_all_devices(swipe_op, 
                                                op_args.get('fx', 0), op_args.get('fy', 0),
                                                op_args.get('tx', 0), op_args.get('ty', 0),
                                                op_args.get('duration', 0.5))
            elif op_type == 'launch_app':
                def launch_op(controller, *args, **kwargs):
                    return controller.launch_app(*args, **kwargs)
                res = self.execute_on_all_devices(launch_op, op_args.get('package_name', ''))
            else:
                print(f"未知操作类型: {op_type}")
                continue
            
            # 整合结果
            for serial, result in res.items():
                results[serial].append({'operation': op_type, 'result': result})
        
        return results

class AdvancedDeviceGroupController(DeviceGroupController):
    """高级设备群控控制器"""
    
    def __init__(self):
        super().__init__