#Mitmproxy - Python抓包神器
Mitmproxy是一款基于Python的开源交互式HTTP/HTTPS代理,支持脚本化操作,非常适合自动化抓包。
#Mitmproxy安装和基础使用
# 安装mitmproxy
pip install mitmproxy
# 启动命令
mitmdump -s script.py -p 8080 # 脚本模式,运行自定义脚本
mitmweb -p 8081 # Web界面模式,提供可视化界面
mitmproxy -p 8082 # 控制台模式,交互式操作#Mitmproxy脚本示例 - App流量拦截器
# mitmproxy_app_interceptor.py
from mitmproxy import http, ctx
import json
import re
import time
from typing import Dict, Any, List
import os
from datetime import datetime
class AppTrafficInterceptor:
"""App流量拦截器 - 专门用于分析App的网络请求"""
def __init__(self):
self.intercepted_requests = []
self.intercepted_responses = []
self.target_domains = [
r'.*api\..*', # 通用API域名
r'.*mobile\..*', # 移动端API
r'.*app\..*', # App相关服务
r'.*sdk\..*', # SDK服务
r'.*analytics\..*', # 分析服务
r'.*tracking\..*', # 追踪服务
]
self.analysis_results = {
'request_count': 0,
'response_count': 0,
'error_count': 0,
'api_endpoints': {},
'data_patterns': [],
'timing_analysis': {}
}
self.output_dir = 'mitmproxy_output'
os.makedirs(self.output_dir, exist_ok=True)
def request(self, flow: http.HTTPFlow) -> None:
"""请求拦截 - 拦截所有经过代理的请求"""
# 检查是否为目标域名
target_matched = False
for domain_pattern in self.target_domains:
if re.search(domain_pattern, flow.request.host, re.IGNORECASE):
target_matched = True
break
if target_matched or self._is_app_related_request(flow):
request_info = {
'timestamp': datetime.fromtimestamp(flow.request.timestamp_start).isoformat(),
'method': flow.request.method,
'url': flow.request.pretty_url,
'host': flow.request.host,
'path': flow.request.path,
'query': dict(flow.request.query or {}),
'headers': dict(flow.request.headers),
'content_type': flow.request.headers.get('content-type', ''),
'content_length': len(flow.request.content) if flow.request.content else 0,
'content': flow.request.text if flow.request.content else '',
'is_target': target_matched
}
self.intercepted_requests.append(request_info)
self.analysis_results['request_count'] += 1
# 统计API端点
endpoint = f"{flow.request.method} {flow.request.path}"
self.analysis_results['api_endpoints'][endpoint] = self.analysis_results['api_endpoints'].get(endpoint, 0) + 1
ctx.log.info(f"🎯 拦截App请求: {flow.request.pretty_url}")
# 可以在这里修改请求(如果需要)
# flow.request.headers["X-App-Crawler"] = "Active"
def response(self, flow: http.HTTPFlow) -> None:
"""响应拦截 - 拦截服务器返回的响应"""
# 检查对应的请求是否是目标请求
request_url = flow.request.pretty_url
is_target_request = any(re.search(pattern, flow.request.host, re.IGNORECASE)
for pattern in self.target_domains) or self._is_app_related_request(flow)
if is_target_request:
response_info = {
'timestamp': datetime.fromtimestamp(flow.response.timestamp_end).isoformat(),
'status_code': flow.response.status_code,
'url': flow.response.url,
'headers': dict(flow.response.headers),
'content_type': flow.response.headers.get('content-type', ''),
'content_length': len(flow.response.content) if flow.response.content else 0,
'content': flow.response.text if flow.response.content else '',
'timing': {
'send_start': flow.response.timestamp_start_send,
'send_end': flow.response.timestamp_end_send,
'receive_start': flow.response.timestamp_start_receive,
'receive_end': flow.response.timestamp_end_receive
},
'request_method': flow.request.method,
'request_url': flow.request.pretty_url,
'request_path': flow.request.path
}
self.intercepted_responses.append(response_info)
self.analysis_results['response_count'] += 1
if flow.response.status_code >= 400:
self.analysis_results['error_count'] += 1
ctx.log.info(f"📡 拦截App响应: {flow.response.url} - {flow.response.status_code}")
# 分析响应数据模式
self._analyze_response_content(response_info)
def _is_app_related_request(self, flow: http.HTTPFlow) -> bool:
"""判断是否为App相关的请求"""
# 检查User-Agent是否包含移动端标识
user_agent = flow.request.headers.get('User-Agent', '').lower()
mobile_indicators = ['mobile', 'android', 'ios', 'iphone', 'ipad', 'app']
# 检查请求路径是否包含App特有路径
path_indicators = ['/api/', '/mobile/', '/app/', '/sdk/', '/v1/', '/v2/', '/v3/']
# 检查请求头中的App特有字段
app_headers = ['x-app-version', 'x-device-id', 'x-platform', 'x-app-id']
return (any(indicator in user_agent for indicator in mobile_indicators) or
any(indicator in flow.request.path.lower() for indicator in path_indicators) or
any(header.lower() in [h.lower() for h in flow.request.headers.keys()] for header in app_headers))
def _analyze_response_content(self, response_info: Dict) -> None:
"""分析响应内容,识别数据模式"""
content = response_info.get('content', '')
if not content:
return
# 检查是否为JSON格式
if response_info.get('content_type', '').startswith('application/json'):
try:
json_data = json.loads(content)
self._extract_json_patterns(json_data, response_info['request_path'])
except json.JSONDecodeError:
pass
def _extract_json_patterns(self, json_data: Any, request_path: str) -> None:
"""从JSON数据中提取模式"""
if isinstance(json_data, dict):
# 检查常见的数据字段
common_fields = ['data', 'result', 'items', 'list', 'response', 'payload']
for field in common_fields:
if field in json_data:
data_sample = json_data[field]
if isinstance(data_sample, (list, dict)):
pattern_info = {
'path': request_path,
'field': field,
'type': type(data_sample).__name__,
'size': len(data_sample) if hasattr(data_sample, '__len__') else 0,
'timestamp': datetime.now().isoformat()
}
self.analysis_results['data_patterns'].append(pattern_info)
break
def done(self) -> None:
"""处理完成时的清理工作"""
# 保存分析结果到文件
self._save_analysis_results()
ctx.log.info(f"📊 分析完成 - 总计: {self.analysis_results['request_count']} 请求, "
f"{self.analysis_results['response_count']} 响应")
def _save_analysis_results(self) -> None:
"""保存分析结果到文件"""
results_file = os.path.join(self.output_dir, f"analysis_results_{int(time.time())}.json")
with open(results_file, 'w', encoding='utf-8') as f:
json.dump(self.analysis_results, f, ensure_ascii=False, indent=2)
# 保存拦截的请求和响应
requests_file = os.path.join(self.output_dir, f"intercepted_requests_{int(time.time())}.json")
with open(requests_file, 'w', encoding='utf-8') as f:
json.dump(self.intercepted_requests, f, ensure_ascii=False, indent=2)
responses_file = os.path.join(self.output_dir, f"intercepted_responses_{int(time.time())}.json")
with open(responses_file, 'w', encoding='utf-8') as f:
json.dump(self.intercepted_responses, f, ensure_ascii=False, indent=2)
ctx.log.info(f"💾 结果已保存到: {results_file}")
# 使用示例脚本
def run_app_interceptor():
"""运行App流量拦截器"""
# 这个函数展示了如何在mitmdump中使用
# 实际使用时保存为.py文件并用 mitmdump -s filename.py 运行
pass
# 添加到addons列表供mitmproxy使用
addons = [
AppTrafficInterceptor()
]
