Mitmproxy - Python抓包神器

Mitmproxy是一款基于Python的开源交互式HTTP/HTTPS代理,支持脚本化操作,非常适合自动化抓包。

Mitmproxy安装和基础使用

# 安装mitmproxy
pip install mitmproxy

# 启动命令
mitmdump -s script.py -p 8080  # 脚本模式,运行自定义脚本
mitmweb -p 8081               # Web界面模式,提供可视化界面
mitmproxy -p 8082             # 控制台模式,交互式操作

Mitmproxy脚本示例 - App流量拦截器

# mitmproxy_app_interceptor.py
from mitmproxy import http, ctx
import json
import re
import time
from typing import Dict, Any, List
import os
from datetime import datetime

class AppTrafficInterceptor:
    """App流量拦截器 - 专门用于分析App的网络请求"""
    
    def __init__(self):
        self.intercepted_requests = []
        self.intercepted_responses = []
        self.target_domains = [
            r'.*api\..*',           # 通用API域名
            r'.*mobile\..*',        # 移动端API
            r'.*app\..*',           # App相关服务
            r'.*sdk\..*',           # SDK服务
            r'.*analytics\..*',     # 分析服务
            r'.*tracking\..*',      # 追踪服务
        ]
        self.analysis_results = {
            'request_count': 0,
            'response_count': 0,
            'error_count': 0,
            'api_endpoints': {},
            'data_patterns': [],
            'timing_analysis': {}
        }
        self.output_dir = 'mitmproxy_output'
        os.makedirs(self.output_dir, exist_ok=True)
        
    def request(self, flow: http.HTTPFlow) -> None:
        """请求拦截 - 拦截所有经过代理的请求"""
        # 检查是否为目标域名
        target_matched = False
        for domain_pattern in self.target_domains:
            if re.search(domain_pattern, flow.request.host, re.IGNORECASE):
                target_matched = True
                break
        
        if target_matched or self._is_app_related_request(flow):
            request_info = {
                'timestamp': datetime.fromtimestamp(flow.request.timestamp_start).isoformat(),
                'method': flow.request.method,
                'url': flow.request.pretty_url,
                'host': flow.request.host,
                'path': flow.request.path,
                'query': dict(flow.request.query or {}),
                'headers': dict(flow.request.headers),
                'content_type': flow.request.headers.get('content-type', ''),
                'content_length': len(flow.request.content) if flow.request.content else 0,
                'content': flow.request.text if flow.request.content else '',
                'is_target': target_matched
            }
            
            self.intercepted_requests.append(request_info)
            self.analysis_results['request_count'] += 1
            
            # 统计API端点
            endpoint = f"{flow.request.method} {flow.request.path}"
            self.analysis_results['api_endpoints'][endpoint] = self.analysis_results['api_endpoints'].get(endpoint, 0) + 1
            
            ctx.log.info(f"🎯 拦截App请求: {flow.request.pretty_url}")
            
            # 可以在这里修改请求(如果需要)
            # flow.request.headers["X-App-Crawler"] = "Active"
                
    def response(self, flow: http.HTTPFlow) -> None:
        """响应拦截 - 拦截服务器返回的响应"""
        # 检查对应的请求是否是目标请求
        request_url = flow.request.pretty_url
        is_target_request = any(re.search(pattern, flow.request.host, re.IGNORECASE) 
                               for pattern in self.target_domains) or self._is_app_related_request(flow)
        
        if is_target_request:
            response_info = {
                'timestamp': datetime.fromtimestamp(flow.response.timestamp_end).isoformat(),
                'status_code': flow.response.status_code,
                'url': flow.response.url,
                'headers': dict(flow.response.headers),
                'content_type': flow.response.headers.get('content-type', ''),
                'content_length': len(flow.response.content) if flow.response.content else 0,
                'content': flow.response.text if flow.response.content else '',
                'timing': {
                    'send_start': flow.response.timestamp_start_send,
                    'send_end': flow.response.timestamp_end_send,
                    'receive_start': flow.response.timestamp_start_receive,
                    'receive_end': flow.response.timestamp_end_receive
                },
                'request_method': flow.request.method,
                'request_url': flow.request.pretty_url,
                'request_path': flow.request.path
            }
            
            self.intercepted_responses.append(response_info)
            self.analysis_results['response_count'] += 1
            
            if flow.response.status_code >= 400:
                self.analysis_results['error_count'] += 1
            
            ctx.log.info(f"📡 拦截App响应: {flow.response.url} - {flow.response.status_code}")
            
            # 分析响应数据模式
            self._analyze_response_content(response_info)
    
    def _is_app_related_request(self, flow: http.HTTPFlow) -> bool:
        """判断是否为App相关的请求"""
        # 检查User-Agent是否包含移动端标识
        user_agent = flow.request.headers.get('User-Agent', '').lower()
        mobile_indicators = ['mobile', 'android', 'ios', 'iphone', 'ipad', 'app']
        
        # 检查请求路径是否包含App特有路径
        path_indicators = ['/api/', '/mobile/', '/app/', '/sdk/', '/v1/', '/v2/', '/v3/']
        
        # 检查请求头中的App特有字段
        app_headers = ['x-app-version', 'x-device-id', 'x-platform', 'x-app-id']
        
        return (any(indicator in user_agent for indicator in mobile_indicators) or
                any(indicator in flow.request.path.lower() for indicator in path_indicators) or
                any(header.lower() in [h.lower() for h in flow.request.headers.keys()] for header in app_headers))
    
    def _analyze_response_content(self, response_info: Dict) -> None:
        """分析响应内容,识别数据模式"""
        content = response_info.get('content', '')
        if not content:
            return
        
        # 检查是否为JSON格式
        if response_info.get('content_type', '').startswith('application/json'):
            try:
                json_data = json.loads(content)
                self._extract_json_patterns(json_data, response_info['request_path'])
            except json.JSONDecodeError:
                pass
    
    def _extract_json_patterns(self, json_data: Any, request_path: str) -> None:
        """从JSON数据中提取模式"""
        if isinstance(json_data, dict):
            # 检查常见的数据字段
            common_fields = ['data', 'result', 'items', 'list', 'response', 'payload']
            for field in common_fields:
                if field in json_data:
                    data_sample = json_data[field]
                    if isinstance(data_sample, (list, dict)):
                        pattern_info = {
                            'path': request_path,
                            'field': field,
                            'type': type(data_sample).__name__,
                            'size': len(data_sample) if hasattr(data_sample, '__len__') else 0,
                            'timestamp': datetime.now().isoformat()
                        }
                        self.analysis_results['data_patterns'].append(pattern_info)
                        break
    
    def done(self) -> None:
        """处理完成时的清理工作"""
        # 保存分析结果到文件
        self._save_analysis_results()
        ctx.log.info(f"📊 分析完成 - 总计: {self.analysis_results['request_count']} 请求, "
                    f"{self.analysis_results['response_count']} 响应")
    
    def _save_analysis_results(self) -> None:
        """保存分析结果到文件"""
        results_file = os.path.join(self.output_dir, f"analysis_results_{int(time.time())}.json")
        with open(results_file, 'w', encoding='utf-8') as f:
            json.dump(self.analysis_results, f, ensure_ascii=False, indent=2)
        
        # 保存拦截的请求和响应
        requests_file = os.path.join(self.output_dir, f"intercepted_requests_{int(time.time())}.json")
        with open(requests_file, 'w', encoding='utf-8') as f:
            json.dump(self.intercepted_requests, f, ensure_ascii=False, indent=2)
        
        responses_file = os.path.join(self.output_dir, f"intercepted_responses_{int(time.time())}.json")
        with open(responses_file, 'w', encoding='utf-8') as f:
            json.dump(self.intercepted_responses, f, ensure_ascii=False, indent=2)
        
        ctx.log.info(f"💾 结果已保存到: {results_file}")

# 使用示例脚本
def run_app_interceptor():
    """运行App流量拦截器"""
    # 这个函数展示了如何在mitmdump中使用
    # 实际使用时保存为.py文件并用 mitmdump -s filename.py 运行
    pass

# 添加到addons列表供mitmproxy使用
addons = [
    AppTrafficInterceptor()
]