#移动端环境配置与抓包基础
#课程目标
- 掌握移动端爬虫的基础环境搭建
- 学会使用不同类型的移动设备进行调试
- 掌握ADB工具的使用方法
- 学会配置和使用抓包工具监控HTTP/HTTPS流量
- 了解并掌握HTTPS证书配置和SSL Pinning绕过技术
- 掌握流量拦截与协议分析,能"看"到App的数据传输过程
#1. 移动端环境搭建
#1.1 模拟器 vs 真机调试
在进行移动端爬虫开发时,我们通常有两种环境选择:模拟器和真机。各有优缺点:
#模拟器优势:
- 成本低,无需购买多台物理设备
- 可以快速创建多个不同配置的设备
- 方便进行批量测试和调试
- 可以模拟不同Android版本
- 便于进行根(root)操作和系统级调试
#真机调试优势:
- 更真实的网络环境和设备特性
- 能够测试真实的性能表现
- 更好的兼容性测试
- 更接近用户实际使用场景
#主流模拟器对比:
夜神模拟器:
# 夜神模拟器常见问题解决
# 1. 启动命令
nox_adb.exe connect 127.0.0.1:62001
# 2. 端口映射
adb forward tcp:8080 tcp:8080 # 映射宿主机8080到模拟器
# 3. 夜神模拟器ADB连接
adb connect 127.0.0.1:62001 # 夜神默认端口
adb connect 127.0.0.1:62025 # 夜神多开端口雷电模拟器:
# 雷电模拟器端口
adb connect 127.0.0.1:5555 # 雷电默认端口
adb connect 127.0.0.1:5556 # 雷电多开时使用
adb connect 127.0.0.1:5557 # 更多实例
# 雷电模拟器特性
# - 性能较好,适合游戏和应用测试
# - 支持多开,可同时运行多个实例
# - 内置ROOT权限,便于调试Genymotion:
# Genymotion需要VirtualBox支持
# 优点:性能好,接近真机体验,支持多种Android版本
# 缺点:商业软件,需要付费,但有免费版本可用
# Genymotion ADB连接
adb connect 10.0.3.15 # Genymotion默认IPAVD (Android Virtual Device):
# Android Studio自带的模拟器
# 优点:官方支持,兼容性好,可配置性强
# 缺点:资源消耗较大,启动较慢
# 启动AVD
emulator -avd Nexus_5X_API_28 # 启动特定AVD#1.2 ADB (Android Debug Bridge) 详解
ADB是Android开发和测试的重要工具,让我们深入了解它的使用。
#ADB基础命令
# 连接设备
adb devices # 查看连接的设备
adb connect <ip>:<port> # 连接网络设备
adb kill-server # 重启ADB服务
adb start-server # 启动ADB服务
# 设备操作
adb shell # 进入设备shell
adb reboot # 重启设备
adb root # 获取ROOT权限(需支持)
adb remount # 重新挂载系统分区
# 文件操作
adb push <local> <remote> # 推送文件到设备
adb pull <remote> <local> # 从设备拉取文件
adb sync # 同步文件到设备
# 应用管理
adb install <apk_path> # 安装应用
adb uninstall <package_name> # 卸载应用
adb shell pm list packages # 查看已安装包
adb shell dumpsys package # 查看包信息
# 日志操作
adb logcat # 查看实时日志
adb logcat -c # 清空日志缓冲区
adb logcat -v time # 按时间格式显示
adb logcat | grep "keyword" # 过滤关键词
# 网络操作
adb shell netstat # 查看网络连接
adb shell iptables -L # 查看防火墙规则
adb forward tcp:<host_port> tcp:<device_port> # 端口转发#ADB高级用法
import subprocess
import time
import os
from typing import Optional, Dict, List
class ADBController:
"""ADB控制器类 - 用于自动化设备管理"""
def __init__(self, device_id: Optional[str] = None, timeout: int = 30):
self.device_id = device_id
self.timeout = timeout
self.available_devices = self._get_connected_devices()
self.current_device = self._select_device()
print(f"ADB控制器初始化完成,当前设备: {self.current_device}")
def _get_connected_devices(self) -> List[str]:
"""获取所有连接的设备ID"""
try:
result = subprocess.run(['adb', 'devices'],
capture_output=True, text=True, timeout=self.timeout)
devices = result.stdout.strip().split('\n')[1:] # 跳过标题行
available_devices = []
for device in devices:
if device.strip() and 'device' in device and 'offline' not in device:
device_id = device.split('\t')[0]
available_devices.append(device_id)
return available_devices
except subprocess.TimeoutExpired:
print("ADB命令执行超时")
return []
except Exception as e:
print(f"获取设备列表失败: {e}")
return []
def _select_device(self) -> str:
"""选择当前操作设备"""
if not self.available_devices:
raise Exception("没有找到可用的ADB设备")
if self.device_id and self.device_id in self.available_devices:
return self.device_id
else:
# 选择第一个可用设备
return self.available_devices[0]
def execute_adb_command(self, command: str) -> str:
"""执行ADB命令并返回结果"""
cmd = ['adb', '-s', self.current_device] + command.split()
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=self.timeout)
if result.returncode != 0:
print(f"ADB命令执行失败: {result.stderr}")
return ""
return result.stdout.strip()
except subprocess.TimeoutExpired:
print(f"ADB命令执行超时: {command}")
return ""
except Exception as e:
print(f"执行ADB命令异常: {e}")
return ""
def install_apk(self, apk_path: str) -> bool:
"""安装APK应用"""
if not os.path.exists(apk_path):
print(f"APK文件不存在: {apk_path}")
return False
result = self.execute_adb_command(f'install {apk_path}')
if "Success" in result:
print(f"APK安装成功: {apk_path}")
return True
else:
print(f"APK安装失败: {result}")
return False
def uninstall_app(self, package_name: str) -> bool:
"""卸载应用"""
result = self.execute_adb_command(f'uninstall {package_name}')
if "Success" in result:
print(f"应用卸载成功: {package_name}")
return True
else:
print(f"应用卸载失败: {result}")
return False
def get_app_info(self, package_name: str) -> Dict:
"""获取应用详细信息"""
cmd = f'shell dumpsys package {package_name}'
result = self.execute_adb_command(cmd)
return {
'package_name': package_name,
'info': result[:1000] + "..." if len(result) > 1000 else result # 限制输出长度
}
def take_screenshot(self, save_path: str) -> bool:
"""截屏并保存到指定路径"""
temp_path = '/sdcard/temp_screenshot.png'
# 在设备上截屏
self.execute_adb_command(f'shell screencap {temp_path}')
# 拉取到本地
pull_result = self.execute_adb_command(f'pull {temp_path} {save_path}')
# 删除设备上的临时文件
self.execute_adb_command(f'shell rm {temp_path}')
return "pulled" in pull_result.lower()
def start_app(self, package_name: str, activity_name: Optional[str] = None) -> bool:
"""启动应用"""
if activity_name:
cmd = f'shell am start -n {package_name}/{activity_name}'
else:
cmd = f'shell monkey -p {package_name} -c android.intent.category.LAUNCHER 1'
result = self.execute_adb_command(cmd)
time.sleep(2) # 等待应用启动
return "Error" not in result
def stop_app(self, package_name: str) -> bool:
"""停止应用"""
result = self.execute_adb_command(f'shell am force-stop {package_name}')
return True # force-stop通常不会返回错误信息
def get_running_apps(self) -> List[str]:
"""获取当前运行的应用列表"""
result = self.execute_adb_command('shell dumpsys activity activities')
apps = []
for line in result.split('\n'):
if 'mResumedActivity' in line or 'mFocusedActivity' in line:
import re
matches = re.findall(r'([a-zA-Z0-9._]+?)/', line)
for match in matches:
if match and not match.startswith('com.android') and match not in apps:
apps.append(match)
return apps[:10] # 返回前10个应用
def get_device_properties(self) -> Dict:
"""获取设备属性信息"""
properties = {}
props_to_get = [
'ro.product.model', # 设备型号
'ro.product.brand', # 品牌
'ro.build.version.release', # Android版本
'ro.build.version.sdk', # API级别
'ro.product.cpu.abi', # CPU架构
'ro.board.platform', # 平台
]
for prop in props_to_get:
value = self.execute_adb_command(f'shell getprop {prop}')
properties[prop] = value.strip()
return properties
# 使用示例
def adb_demo():
"""ADB操作演示"""
try:
controller = ADBController()
print("✓ ADB控制器初始化成功")
# 获取设备信息
device_props = controller.get_device_properties()
print(f"设备信息: {device_props}")
# 获取运行中的应用
running_apps = controller.get_running_apps()
print(f"运行中的应用: {running_apps[:5]}") # 显示前5个
# 截屏示例
screenshot_path = f"screenshot_{int(time.time())}.png"
if controller.take_screenshot(screenshot_path):
print(f"✓ 截屏保存至: {screenshot_path}")
else:
print("✗ 截屏失败")
except Exception as e:
print(f"✗ ADB操作演示失败: {e}")
if __name__ == "__main__":
adb_demo()#2. 抓包工具详解
#2.1 Fiddler 配置与使用
Fiddler是最常用的HTTP调试代理工具,特别适合移动端抓包。
#Fiddler基础配置
# Fiddler配置要点
1. 确保勾选 "Capture HTTPS CONNECTs" - 捕获HTTPS连接
2. 勾选 "Decrypt HTTPS traffic" - 解密HTTPS流量
3. 忽略服务器证书错误 - Tools > Options > HTTPS > Ignore server certificate errors
4. 设置监听端口(默认8888)- Tools > Options > Connections
# FiddlerScript自定义规则
static function OnBeforeRequest(oS: Session) {
// 过滤特定域名
if (oS.HostnameIs("api.example.com")) {
oS.bBufferResponse = true;
}
// 修改请求头
if (oS.uriContains("/api/")) {
oS.oRequest["X-Custom-Header"] = "AppCrawler";
}
}
static function OnBeforeResponse(oS: Session) {
// 修改响应内容
if (oS.oResponse.MIMEType == "application/json") {
oS.utilDecodeResponse();
var body = oS.GetResponseBodyAsString();
// 处理响应数据
oS.utilSetResponseBody(body);
}
}#Python集成Fiddler
import requests
import json
from typing import Dict, List, Optional
import time
class FiddlerProxy:
"""Fiddler代理控制类"""
def __init__(self, proxy_host='127.0.0.1', proxy_port=8888):
self.proxy_host = proxy_host
self.proxy_port = proxy_port
self.proxies = {
'http': f'http://{proxy_host}:{proxy_port}',
'https': f'http://{proxy_host}:{proxy_port}'
}
self.session = requests.Session()
self.session.proxies.update(self.proxies)
print(f"Fiddler代理配置完成: {proxy_host}:{proxy_port}")
def test_proxy_connection(self) -> bool:
"""测试代理连接"""
try:
response = self.session.get('http://www.baidu.com', timeout=10)
success = response.status_code == 200
print(f"代理连接测试: {'成功' if success else '失败'}")
return success
except Exception as e:
print(f"代理连接测试失败: {e}")
return False
def capture_traffic(self, target_url: str, method='GET', **kwargs) -> Dict:
"""捕获流量详细信息"""
try:
start_time = time.time()
if method.upper() == 'GET':
response = self.session.get(target_url, **kwargs)
elif method.upper() == 'POST':
response = self.session.post(target_url, **kwargs)
elif method.upper() == 'PUT':
response = self.session.put(target_url, **kwargs)
elif method.upper() == 'DELETE':
response = self.session.delete(target_url, **kwargs)
else:
raise ValueError(f"不支持的HTTP方法: {method}")
end_time = time.time()
duration = end_time - start_time
return {
'status_code': response.status_code,
'headers': dict(response.headers),
'body': response.text,
'url': response.url,
'method': method.upper(),
'duration': duration,
'content_length': len(response.content),
'request_headers': dict(kwargs.get('headers', {})),
'request_data': kwargs.get('data', kwargs.get('json', ''))
}
except Exception as e:
return {'error': str(e), 'url': target_url, 'method': method.upper()}
def monitor_app_traffic(self, app_domains: List[str], duration: int = 300) -> List[Dict]:
"""监控特定App的流量"""
print(f"开始监控App流量,目标域名: {app_domains},持续时间: {duration}秒")
captured_data = []
start_time = time.time()
# 这里实际应该通过Fiddler的API或日志来监控流量
# 由于无法直接控制Fiddler,我们模拟监控过程
import threading
import time as thread_time
def capture_loop():
counter = 0
while thread_time.time() - start_time < duration:
# 模拟捕获到的请求
for domain in app_domains:
import random
if random.random() > 0.7: # 30%概率捕获请求
captured_data.append({
'timestamp': thread_time.time(),
'url': f'https://{domain}/api/v1/data',
'method': random.choice(['GET', 'POST']),
'status': random.choice([200, 201, 404, 500]),
'size': random.randint(100, 10000)
})
thread_time.sleep(1) # 每秒检查一次
monitor_thread = threading.Thread(target=capture_loop)
monitor_thread.daemon = True
monitor_thread.start()
monitor_thread.join(timeout=duration)
print(f"流量监控完成,共捕获 {len(captured_data)} 条请求")
return captured_data
# 配置移动设备使用Fiddler代理
def configure_mobile_proxy_setup():
"""配置移动设备代理详细指南"""
config_guide = """
=== 移动设备代理配置详细指南 ===
1. 确保手机和电脑在同一WiFi网络
2. 在电脑上启动Fiddler并确认代理服务运行
3. 记录电脑IP地址(cmd: ipconfig 或 ifconfig)
4. 在手机WiFi设置中找到当前连接
5. 配置代理为手动
6. 服务器地址:电脑IP地址(如:192.168.1.100)
7. 端口号:8888(Fiddler默认端口)
8. 保存设置
=== 证书安装步骤 ===
1. 在手机浏览器访问:http://<电脑IP>:8888
2. 点击"FiddlerRoot certificate"下载证书
3. 根据Android版本安装证书到受信任凭据
4. Android 7+ 需要将证书安装到系统区(需要ROOT)
"""
print(config_guide)
return config_guide
def fiddler_demo():
"""Fiddler使用演示"""
proxy = FiddlerProxy()
if proxy.test_proxy_connection():
print("✓ Fiddler代理连接成功")
# 捕获示例流量
result = proxy.capture_traffic('http://httpbin.org/get', method='GET')
print(f"捕获结果状态: {result.get('status_code', 'Error')}")
else:
print("✗ Fiddler代理连接失败")#2.2 Charles 抓包工具
Charles是另一款强大的抓包工具,界面友好,功能丰富。
#Charles配置要点
# Charles HTTPS代理配置
1. Proxy -> SSL Proxying Settings
2. Enable SSL Proxying
3. Locations中添加需要抓取的域名
*:* 或 api.example.com:443 或特定的App API域名
# 端口配置
Default port: 8888
SOCKS Proxy Port: 9999
# 断点设置
Proxy -> Breakpoints Settings
添加需要拦截的URL规则,如:
*/api/*
*/mobile/*
*/app/*
# 映射设置
Tools -> Map Remote
将线上地址映射到本地或其他地址进行测试#2.3 Mitmproxy - Python抓包神器
Mitmproxy是一款基于Python的开源交互式HTTP/HTTPS代理,支持脚本化操作,非常适合自动化抓包。
#Mitmproxy安装和基础使用
# 安装mitmproxy
pip install mitmproxy
# 启动命令
mitmdump -s script.py -p 8080 # 脚本模式,运行自定义脚本
mitmweb -p 8081 # Web界面模式,提供可视化界面
mitmproxy -p 8082 # 控制台模式,交互式操作#Mitmproxy脚本示例 - App流量拦截器
# mitmproxy_app_interceptor.py
from mitmproxy import http, ctx
import json
import re
import time
from typing import Dict, Any, List
import os
from datetime import datetime
class AppTrafficInterceptor:
"""App流量拦截器 - 专门用于分析App的网络请求"""
def __init__(self):
self.intercepted_requests = []
self.intercepted_responses = []
self.target_domains = [
r'.*api\..*', # 通用API域名
r'.*mobile\..*', # 移动端API
r'.*app\..*', # App相关服务
r'.*sdk\..*', # SDK服务
r'.*analytics\..*', # 分析服务
r'.*tracking\..*', # 追踪服务
]
self.analysis_results = {
'request_count': 0,
'response_count': 0,
'error_count': 0,
'api_endpoints': {},
'data_patterns': [],
'timing_analysis': {}
}
self.output_dir = 'mitmproxy_output'
os.makedirs(self.output_dir, exist_ok=True)
def request(self, flow: http.HTTPFlow) -> None:
"""请求拦截 - 拦截所有经过代理的请求"""
# 检查是否为目标域名
target_matched = False
for domain_pattern in self.target_domains:
if re.search(domain_pattern, flow.request.host, re.IGNORECASE):
target_matched = True
break
if target_matched or self._is_app_related_request(flow):
request_info = {
'timestamp': datetime.fromtimestamp(flow.request.timestamp_start).isoformat(),
'method': flow.request.method,
'url': flow.request.pretty_url,
'host': flow.request.host,
'path': flow.request.path,
'query': dict(flow.request.query or {}),
'headers': dict(flow.request.headers),
'content_type': flow.request.headers.get('content-type', ''),
'content_length': len(flow.request.content) if flow.request.content else 0,
'content': flow.request.text if flow.request.content else '',
'is_target': target_matched
}
self.intercepted_requests.append(request_info)
self.analysis_results['request_count'] += 1
# 统计API端点
endpoint = f"{flow.request.method} {flow.request.path}"
self.analysis_results['api_endpoints'][endpoint] = self.analysis_results['api_endpoints'].get(endpoint, 0) + 1
ctx.log.info(f"🎯 拦截App请求: {flow.request.pretty_url}")
# 可以在这里修改请求(如果需要)
# flow.request.headers["X-App-Crawler"] = "Active"
def response(self, flow: http.HTTPFlow) -> None:
"""响应拦截 - 拦截服务器返回的响应"""
# 检查对应的请求是否是目标请求
request_url = flow.request.pretty_url
is_target_request = any(re.search(pattern, flow.request.host, re.IGNORECASE)
for pattern in self.target_domains) or self._is_app_related_request(flow)
if is_target_request:
response_info = {
'timestamp': datetime.fromtimestamp(flow.response.timestamp_end).isoformat(),
'status_code': flow.response.status_code,
'url': flow.response.url,
'headers': dict(flow.response.headers),
'content_type': flow.response.headers.get('content-type', ''),
'content_length': len(flow.response.content) if flow.response.content else 0,
'content': flow.response.text if flow.response.content else '',
'timing': {
'send_start': flow.response.timestamp_start_send,
'send_end': flow.response.timestamp_end_send,
'receive_start': flow.response.timestamp_start_receive,
'receive_end': flow.response.timestamp_end_receive
},
'request_method': flow.request.method,
'request_url': flow.request.pretty_url,
'request_path': flow.request.path
}
self.intercepted_responses.append(response_info)
self.analysis_results['response_count'] += 1
if flow.response.status_code >= 400:
self.analysis_results['error_count'] += 1
ctx.log.info(f"📡 拦截App响应: {flow.response.url} - {flow.response.status_code}")
# 分析响应数据模式
self._analyze_response_content(response_info)
def _is_app_related_request(self, flow: http.HTTPFlow) -> bool:
"""判断是否为App相关的请求"""
# 检查User-Agent是否包含移动端标识
user_agent = flow.request.headers.get('User-Agent', '').lower()
mobile_indicators = ['mobile', 'android', 'ios', 'iphone', 'ipad', 'app']
# 检查请求路径是否包含App特有路径
path_indicators = ['/api/', '/mobile/', '/app/', '/sdk/', '/v1/', '/v2/', '/v3/']
# 检查请求头中的App特有字段
app_headers = ['x-app-version', 'x-device-id', 'x-platform', 'x-app-id']
return (any(indicator in user_agent for indicator in mobile_indicators) or
any(indicator in flow.request.path.lower() for indicator in path_indicators) or
any(header.lower() in [h.lower() for h in flow.request.headers.keys()] for header in app_headers))
def _analyze_response_content(self, response_info: Dict) -> None:
"""分析响应内容,识别数据模式"""
content = response_info.get('content', '')
if not content:
return
# 检查是否为JSON格式
if response_info.get('content_type', '').startswith('application/json'):
try:
json_data = json.loads(content)
self._extract_json_patterns(json_data, response_info['request_path'])
except json.JSONDecodeError:
pass
def _extract_json_patterns(self, json_data: Any, request_path: str) -> None:
"""从JSON数据中提取模式"""
if isinstance(json_data, dict):
# 检查常见的数据字段
common_fields = ['data', 'result', 'items', 'list', 'response', 'payload']
for field in common_fields:
if field in json_data:
data_sample = json_data[field]
if isinstance(data_sample, (list, dict)):
pattern_info = {
'path': request_path,
'field': field,
'type': type(data_sample).__name__,
'size': len(data_sample) if hasattr(data_sample, '__len__') else 0,
'timestamp': datetime.now().isoformat()
}
self.analysis_results['data_patterns'].append(pattern_info)
break
def done(self) -> None:
"""处理完成时的清理工作"""
# 保存分析结果到文件
self._save_analysis_results()
ctx.log.info(f"📊 分析完成 - 总计: {self.analysis_results['request_count']} 请求, "
f"{self.analysis_results['response_count']} 响应")
def _save_analysis_results(self) -> None:
"""保存分析结果到文件"""
results_file = os.path.join(self.output_dir, f"analysis_results_{int(time.time())}.json")
with open(results_file, 'w', encoding='utf-8') as f:
json.dump(self.analysis_results, f, ensure_ascii=False, indent=2)
# 保存拦截的请求和响应
requests_file = os.path.join(self.output_dir, f"intercepted_requests_{int(time.time())}.json")
with open(requests_file, 'w', encoding='utf-8') as f:
json.dump(self.intercepted_requests, f, ensure_ascii=False, indent=2)
responses_file = os.path.join(self.output_dir, f"intercepted_responses_{int(time.time())}.json")
with open(responses_file, 'w', encoding='utf-8') as f:
json.dump(self.intercepted_responses, f, ensure_ascii=False, indent=2)
ctx.log.info(f"💾 结果已保存到: {results_file}")
# 使用示例脚本
def run_app_interceptor():
"""运行App流量拦截器"""
# 这个函数展示了如何在mitmdump中使用
# 实际使用时保存为.py文件并用 mitmdump -s filename.py 运行
pass
# 添加到addons列表供mitmproxy使用
addons = [
AppTrafficInterceptor()
]#高级Mitmproxy脚本 - 流量分析与协议解密
# advanced_app_analyzer.py
from mitmproxy import http, ctx, master
from mitmproxy.tools.dump import DumpMaster
import asyncio
import json
import os
from datetime import datetime
import sqlite3
import zlib
import gzip
from typing import Dict, Any, Optional
import re
class AdvancedAppAnalyzer:
"""高级App分析器 - 提供深度协议分析和数据解密功能"""
def __init__(self, db_path: str = 'advanced_app_analysis.db'):
self.db_path = db_path
self.init_database()
self.protocol_decoders = self._init_protocol_decoders()
self.encryption_detectors = self._init_encryption_detectors()
self.analysis_stats = {
'total_requests': 0,
'decrypted_requests': 0,
'protocol_identified': 0,
'data_extracted': 0
}
def init_database(self):
"""初始化分析数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 请求表
cursor.execute('''
CREATE TABLE IF NOT EXISTS requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
method TEXT,
url TEXT,
host TEXT,
path TEXT,
headers TEXT,
content BLOB,
content_type TEXT,
content_encoding TEXT,
protocol_type TEXT,
is_encrypted INTEGER,
decrypted_content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 响应表
cursor.execute('''
CREATE TABLE IF NOT EXISTS responses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id INTEGER,
timestamp REAL,
status_code INTEGER,
headers TEXT,
content BLOB,
content_type TEXT,
content_encoding TEXT,
response_size INTEGER,
processing_time REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (request_id) REFERENCES requests (id)
)
''')
# 协议分析表
cursor.execute('''
CREATE TABLE IF NOT EXISTS protocol_analysis (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id INTEGER,
protocol_name TEXT,
fields_extracted TEXT,
encryption_type TEXT,
decryption_success INTEGER,
analysis_notes TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (request_id) REFERENCES requests (id)
)
''')
conn.commit()
conn.close()
def _init_protocol_decoders(self) -> Dict[str, callable]:
"""初始化协议解码器"""
return {
'json': self._decode_json,
'protobuf': self._decode_protobuf_stub, # 实际使用需要protobuf库
'form_data': self._decode_form_data,
'multipart': self._decode_multipart,
'custom_binary': self._decode_custom_binary
}
def _init_encryption_detectors(self) -> Dict[str, callable]:
"""初始化加密检测器"""
return {
'base64': self._detect_base64_encryption,
'aes_stub': self._detect_aes_stub, # 实际AES检测需要更多逻辑
'custom_encryption': self._detect_custom_encryption
}
def request(self, flow: http.HTTPFlow) -> None:
"""处理请求"""
self.analysis_stats['total_requests'] += 1
# 解析请求内容
content = flow.request.content if flow.request.content else b''
content_type = flow.request.headers.get('content-type', '')
content_encoding = flow.request.headers.get('content-encoding', '')
# 检测协议类型
protocol_type = self._identify_protocol(content, content_type)
# 检测是否加密
is_encrypted = self._detect_encryption(content, flow.request.headers)
# 尝试解密(如果可能)
decrypted_content = self._attempt_decryption(content, flow.request.headers) if is_encrypted else content.decode('utf-8', errors='ignore')
# 保存到数据库
request_id = self._save_request_to_db(
flow, content, content_type, content_encoding,
protocol_type, is_encrypted, decrypted_content
)
# 进行协议分析
self._analyze_protocol(request_id, content, protocol_type, flow.request.headers)
ctx.log.info(f"🔍 分析请求: {flow.request.pretty_url} | 协议: {protocol_type} | 加密: {is_encrypted}")
def response(self, flow: http.HTTPFlow) -> None:
"""处理响应"""
if hasattr(flow, 'request') and hasattr(flow.request, 'id'):
# 计算处理时间
processing_time = (flow.response.timestamp_end_receive or time.time()) - (flow.request.timestamp_start or time.time())
# 保存响应到数据库
self._save_response_to_db(
flow.request.id if hasattr(flow.request, 'id') else None,
flow, processing_time
)
def _identify_protocol(self, content: bytes, content_type: str) -> str:
"""识别协议类型"""
if 'application/json' in content_type:
return 'json'
elif 'application/x-www-form-urlencoded' in content_type:
return 'form_data'
elif 'multipart/form-data' in content_type:
return 'multipart'
elif content.startswith(b'{') and content.endswith(b'}'):
return 'json'
elif b'boundary=' in content_type.encode():
return 'multipart'
else:
# 尝试检测自定义协议
if self._looks_like_custom_protocol(content):
return 'custom_binary'
else:
return 'unknown'
def _looks_like_custom_protocol(self, content: bytes) -> bool:
"""检测是否为自定义协议"""
# 简单的启发式检测
if len(content) < 10:
return False
# 检查是否有明显的二进制特征
printable_chars = sum(1 for b in content[:50] if 32 <= b <= 126 or b in (9, 10, 13))
return printable_chars / min(len(content), 50) < 0.7 # 如果可打印字符少于70%,可能是二进制协议
def _detect_encryption(self, content: bytes, headers: http.Headers) -> bool:
"""检测内容是否加密"""
content_str = content.decode('utf-8', errors='ignore')
for detector_name, detector_func in self.encryption_detectors.items():
if detector_func(content_str, headers):
return True
return False
def _attempt_decryption(self, content: bytes, headers: http.Headers) -> str:
"""尝试解密内容"""
content_str = content.decode('utf-8', errors='ignore')
# 尝试Base64解码
import base64
try:
if self._detect_base64_encryption(content_str, headers):
decoded = base64.b64decode(content_str)
return decoded.decode('utf-8', errors='ignore')
except:
pass
# 尝试解压缩
try:
if 'gzip' in headers.get('content-encoding', ''):
decompressed = gzip.decompress(content)
return decompressed.decode('utf-8', errors='ignore')
elif 'deflate' in headers.get('content-encoding', ''):
decompressed = zlib.decompress(content)
return decompressed.decode('utf-8', errors='ignore')
except:
pass
return content_str # 返回原始内容
def _detect_base64_encryption(self, content: str, headers: http.Headers) -> bool:
"""检测Base64加密"""
# 检查内容是否看起来像Base64
if len(content) < 20:
return False
# Base64通常只包含A-Z, a-z, 0-9, +, /, =
base64_pattern = r'^[A-Za-z0-9+/]*={0,2}$'
return bool(re.match(base64_pattern, content.strip()))
def _detect_aes_stub(self, content: str, headers: http.Headers) -> bool:
"""AES加密检测桩(实际实现需要密码学知识)"""
# 这里只是示例,实际AES检测需要分析加密块模式等
return False
def _detect_custom_encryption(self, content: str, headers: http.Headers) -> bool:
"""检测自定义加密"""
# 检查是否有自定义加密头或特定模式
custom_enc_headers = ['x-encrypted', 'x-encoded', 'x-security']
for header in custom_enc_headers:
if header.lower() in [h.lower() for h in headers.keys()]:
return True
return False
def _decode_json(self, content: bytes) -> Dict[str, Any]:
"""解码JSON内容"""
try:
return json.loads(content.decode('utf-8'))
except:
return {'error': 'Invalid JSON'}
def _decode_form_data(self, content: bytes) -> Dict[str, Any]:
"""解码表单数据"""
try:
content_str = content.decode('utf-8')
pairs = content_str.split('&')
result = {}
for pair in pairs:
if '=' in pair:
key, value = pair.split('=', 1)
result[key] = value
return result
except:
return {'error': 'Invalid form data'}
def _decode_multipart(self, content: bytes) -> Dict[str, Any]:
"""解码multipart数据(简化版)"""
return {'type': 'multipart', 'size': len(content)}
def _decode_custom_binary(self, content: bytes) -> Dict[str, Any]:
"""解码自定义二进制协议(桩函数)"""
return {'type': 'binary', 'size': len(content), 'preview': content[:100].hex()}
def _save_request_to_db(self, flow, content: bytes, content_type: str, content_encoding: str,
protocol_type: str, is_encrypted: bool, decrypted_content: str) -> int:
"""保存请求到数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO requests
(timestamp, method, url, host, path, headers, content, content_type, content_encoding,
protocol_type, is_encrypted, decrypted_content)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
flow.request.timestamp_start,
flow.request.method,
flow.request.pretty_url,
flow.request.host,
flow.request.path,
json.dumps(dict(flow.request.headers)),
content,
content_type,
content_encoding,
protocol_type,
int(is_encrypted),
decrypted_content
))
request_id = cursor.lastrowid
conn.commit()
conn.close()
return request_id
def _save_response_to_db(self, request_id: Optional[int], flow, processing_time: float):
"""保存响应到数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
content = flow.response.content if flow.response.content else b''
cursor.execute('''
INSERT INTO responses
(request_id, timestamp, status_code, headers, content, content_type,
content_encoding, response_size, processing_time)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
request_id,
flow.response.timestamp_end,
flow.response.status_code,
json.dumps(dict(flow.response.headers)),
content,
flow.response.headers.get('content-type', ''),
flow.response.headers.get('content-encoding', ''),
len(content),
processing_time
))
conn.commit()
conn.close()
def _analyze_protocol(self, request_id: int, content: bytes, protocol_type: str, headers: http.Headers):
"""分析协议并保存结果"""
# 提取字段信息
fields_info = self._extract_protocol_fields(content, protocol_type)
# 检测加密类型
encryption_type = self._detect_encryption_type(content, headers)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO protocol_analysis
(request_id, protocol_name, fields_extracted, encryption_type, decryption_success, analysis_notes)
VALUES (?, ?, ?, ?, ?, ?)
''', (
request_id,
protocol_type,
json.dumps(fields_info),
encryption_type,
1, # 假设解密成功(实际上应该根据解密结果设置)
f"Protocol: {protocol_type}, Encrypted: {bool(encryption_type)}"
))
conn.commit()
conn.close()
self.analysis_stats['protocol_identified'] += 1
if encryption_type:
self.analysis_stats['decrypted_requests'] += 1
def _extract_protocol_fields(self, content: bytes, protocol_type: str) -> Dict[str, Any]:
"""从协议内容中提取字段信息"""
if protocol_type == 'json':
try:
data = json.loads(content.decode('utf-8'))
if isinstance(data, dict):
return {
'keys': list(data.keys()),
'structure': self._analyze_dict_structure(data),
'data_types': {k: type(v).__name__ for k, v in data.items()}
}
except:
pass
elif protocol_type == 'form_data':
content_str = content.decode('utf-8')
fields = {}
for pair in content_str.split('&'):
if '=' in pair:
k, v = pair.split('=', 1)
fields[k] = v
return {'fields': list(fields.keys()), 'count': len(fields)}
return {'raw_size': len(content), 'type': protocol_type}
def _analyze_dict_structure(self, data: dict, max_depth: int = 3, current_depth: int = 0) -> Dict[str, Any]:
"""分析字典结构"""
if current_depth >= max_depth:
return {'type': 'nested_object', 'depth_limit_reached': True}
structure = {}
for key, value in data.items():
if isinstance(value, dict):
structure[key] = self._analyze_dict_structure(value, max_depth, current_depth + 1)
elif isinstance(value, list):
if value and isinstance(value[0], dict):
structure[key] = {
'type': 'array_of_objects',
'sample_structure': self._analyze_dict_structure(value[0], max_depth, current_depth + 1) if value else {}
}
else:
structure[key] = {
'type': 'array',
'sample_items': value[:3],
'length': len(value)
}
else:
structure[key] = {
'type': type(value).__name__,
'sample_value': str(value)[:50] # 限制样本值长度
}
return structure
def _detect_encryption_type(self, content: bytes, headers: http.Headers) -> str:
"""检测加密类型"""
content_str = content.decode('utf-8', errors='ignore')
for enc_type, detector in self.encryption_detectors.items():
if detector(content_str, headers):
return enc_type
return ""
def done(self):
"""分析完成时的统计输出"""
ctx.log.info(f"📈 高级分析完成统计:")
ctx.log.info(f" 总请求数: {self.analysis_stats['total_requests']}")
ctx.log.info(f" 协议识别数: {self.analysis_stats['protocol_identified']}")
ctx.log.info(f" 解密请求数: {self.analysis_stats['decrypted_requests']}")
ctx.log.info(f" 数据库位置: {self.db_path}")
# 为mitmproxy准备的addons
addons = [
AdvancedAppAnalyzer()
]#3. HTTPS证书配置与SSL Pinning绕过
#3.1 HTTPS证书基础与配置
HTTPS证书是保证通信安全的基础,移动端爬虫需要处理证书相关的问题。
#证书安装到系统分区(需要ROOT)
# Android证书操作详解
# 1. 生成CA证书(在电脑上执行)
openssl genrsa -out ca.key 2048
openssl req -new -x509 -key ca.key -out ca.crt -days 365 -subj "/CN=AppCrawler CA/O=AppCrawler/C=US"
# 2. 转换为Android格式(获取证书哈希值)
CERT_HASH=$(openssl x509 -inform PEM -subject_hash_old -in ca.crt | head -1)
mv ca.crt ${CERT_HASH}.0
# 3. 推送到系统证书目录(需要ROOT权限)
adb root
adb remount
adb push ${CERT_HASH}.0 /system/etc/security/cacerts/
adb shell chmod 644 /system/etc/security/cacerts/${CERT_HASH}.0
adb reboot
# 4. 验证证书安装
adb shell ls -la /system/etc/security/cacerts/ | grep ${CERT_HASH}#证书配置Python脚本
import os
import subprocess
import tempfile
import ssl
import hashlib
from typing import Optional, Tuple
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
class CertificateManager:
"""证书管理器 - 用于生成、安装和管理HTTPS证书"""
def __init__(self):
self.ca_cert_path = None
self.ca_key_path = None
self.device_cert_installed = False
self.certificates_dir = "certificates"
os.makedirs(self.certificates_dir, exist_ok=True)
def generate_ca_certificate(self, common_name: str = "AppCrawler Root CA",
organization: str = "AppCrawler Organization") -> Tuple[str, str]:
"""生成CA证书和私钥"""
try:
# 使用临时文件生成证书
with tempfile.NamedTemporaryFile(mode='w+', suffix='.key', delete=False) as key_file:
key_filename = key_file.name
with tempfile.NamedTemporaryFile(mode='w+', suffix='.crt', delete=False) as crt_file:
crt_filename = crt_file.name
# 使用OpenSSL生成自签名CA证书
openssl_cmd = [
'openssl', 'req', '-new', '-x509', '-keyout', key_filename,
'-out', crt_filename, '-days', '365', '-nodes',
'-subj', f'/CN={common_name}/O={organization}/C=US'
]
result = subprocess.run(openssl_cmd,
capture_output=True,
text=True,
timeout=30)
if result.returncode == 0:
# 移动到永久位置
final_key_path = os.path.join(self.certificates_dir, "ca.key")
final_cert_path = os.path.join(self.certificates_dir, "ca.crt")
os.rename(key_filename, final_key_path)
os.rename(crt_filename, final_cert_path)
self.ca_key_path = final_key_path
self.ca_cert_path = final_cert_path
print(f"✅ CA证书生成成功: {final_cert_path}")
print(f"🔑 私钥文件: {final_key_path}")
return final_key_path, final_cert_path
else:
print(f"❌ 证书生成失败: {result.stderr}")
# 清理临时文件
os.unlink(key_filename)
os.unlink(crt_filename)
return None, None
except subprocess.TimeoutExpired:
print("❌ 证书生成超时")
return None, None
except FileNotFoundError:
print("❌ 未找到OpenSSL,请确保已安装OpenSSL并添加到PATH")
return None, None
except Exception as e:
print(f"❌ 证书生成异常: {e}")
return None, None
def get_certificate_hash(self, cert_path: str) -> str:
"""获取证书的哈希值(用于Android系统证书命名)"""
try:
with open(cert_path, 'rb') as f:
cert_data = f.read()
# 解析证书
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
# 计算subject哈希(使用旧算法)
subject_bytes = cert.subject.public_bytes(default_backend())
hash_digest = hashes.Hash(hashes.MD5(), default_backend())
hash_digest.update(subject_bytes)
hash_bytes = hash_digest.finalize()
# 转换为十六进制并格式化
cert_hash = hash_bytes.hex()
return cert_hash
except Exception as e:
print(f"❌ 获取证书哈希失败: {e}")
# 如果cryptography不可用,尝试使用OpenSSL
try:
result = subprocess.run([
'openssl', 'x509', '-inform', 'PEM',
'-subject_hash_old', '-in', cert_path
], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
return result.stdout.strip().split('\n')[0]
except:
pass
return None
def prepare_android_certificate(self, cert_path: str) -> str:
"""准备Android系统证书格式"""
cert_hash = self.get_certificate_hash(cert_path)
if not cert_hash:
print("❌ 无法获取证书哈希")
return None
# 创建Android格式的证书文件名
android_cert_name = f"{cert_hash}.0"
android_cert_path = os.path.join(self.certificates_dir, android_cert_name)
# 复制证书文件
import shutil
shutil.copy2(cert_path, android_cert_path)
print(f"✅ Android证书准备完成: {android_cert_path}")
return android_cert_path
def install_cert_to_device(self, cert_path: str, use_adb: bool = True) -> bool:
"""安装证书到设备"""
try:
if not use_adb:
print("ℹ️ 手动安装模式 - 请将证书传输到设备并手动安装")
print(f"📍 证书位置: {cert_path}")
return True
# 推送证书到设备临时目录
remote_temp_path = f'/sdcard/{os.path.basename(cert_path)}'
push_result = subprocess.run([
'adb', 'push', cert_path, remote_temp_path
], capture_output=True, text=True)
if push_result.returncode != 0:
print(f"❌ 推送证书失败: {push_result.stderr}")
return False
print(f"✅ 证书已推送至设备: {remote_temp_path}")
# 检查是否需要ROOT权限安装到系统目录
# 对于Android 7+,证书需要安装到系统证书目录才能被应用信任
try:
# 尝试获取ROOT权限
root_check = subprocess.run([
'adb', 'shell', 'su', '-c', 'id'
], capture_output=True, text=True, timeout=10)
if 'uid=0' in root_check.stdout:
print("🔐 设备已ROOT,尝试安装到系统目录")
# 计算证书哈希并重命名
cert_hash = self.get_certificate_hash(cert_path)
if cert_hash:
system_cert_name = f"/system/etc/security/cacerts/{cert_hash}.0"
# 执行ROOT安装命令
install_cmd = [
'adb', 'shell', 'su', '-c',
f'mount -o rw,remount /system && '
f'cp {remote_temp_path} {system_cert_name} && '
f'chmod 644 {system_cert_name}'
]
install_result = subprocess.run(
[' '.join(install_cmd)],
capture_output=True,
text=True,
shell=True
)
if install_result.returncode == 0:
print(f"✅ 证书已安装到系统目录: {system_cert_name}")
self.device_cert_installed = True
# 清理临时文件
subprocess.run(['adb', 'shell', f'rm {remote_temp_path}'],
capture_output=True)
return True
else:
print(f"⚠️ 系统目录安装失败: {install_result.stderr}")
print("ℹ️ 请手动安装证书或使用非ROOT模式")
except subprocess.TimeoutExpired:
print("⏰ ROOT权限检查超时")
except Exception as e:
print(f"⚠️ ROOT安装过程中出错: {e}")
# 如果ROOT安装失败,提供手动安装指导
print("\n📋 手动安装证书步骤:")
print("1. 打开设备上的 '设置' -> '安全' -> '加密与凭据' -> '从存储设备安装'")
print("2. 选择证书文件进行安装")
print("3. 设置证书名称和用途(一般选择VPN和应用)")
return True # 任务交给用户手动完成
except Exception as e:
print(f"❌ 安装证书失败: {e}")
return False
def setup_proxy_certificate(self, proxy_port: int = 8888) -> bool:
"""配置代理证书"""
print(f"🔧 开始配置代理证书 (端口: {proxy_port})")
# 生成CA证书
key_path, cert_path = self.generate_ca_certificate()
if not key_path:
print("❌ 证书生成失败,无法配置代理证书")
return False
# 准备Android格式证书
android_cert_path = self.prepare_android_certificate(cert_path)
if not android_cert_path:
print("❌ Android证书准备失败")
return False
# 安装到设备
success = self.install_cert_to_device(android_cert_path)
if success:
print(f"✅ 代理证书配置完成!")
print(f"📍 证书文件: {cert_path}")
print(f"📍 Android证书: {android_cert_path}")
print(f"📋 请确保代理工具(如Fiddler/Charles/Mitmproxy)使用相同CA证书")
return success
# 使用示例
def setup_https_interception():
"""设置HTTPS拦截的完整流程"""
print("🚀 开始设置HTTPS流量拦截...")
cert_manager = CertificateManager()
# 生成并安装证书
success = cert_manager.setup_proxy_certificate(proxy_port=8888)
if success:
print("\n🎉 HTTPS拦截设置完成!")
print("\n📋 后续配置步骤:")
print("1. 启动代理工具(Fiddler/Charles/Mitmproxy)")
print("2. 配置移动设备代理指向电脑IP:8888")
print("3. 确保证书已在设备中正确安装并信任")
print("4. 开始监控HTTPS流量")
else:
print("\n❌ HTTPS拦截设置失败,请检查错误信息")
if __name__ == "__main__":
setup_https_interception()#3.2 SSL Pinning 绕过技术
SSL Pinning是一种安全机制,应用将服务器证书或公钥硬编码到应用中,防止中间人攻击。
#SSL Pinning检测与分析
import requests
from urllib.parse import urlparse
import ssl
import socket
import subprocess
from typing import Dict, List, Any
import json
class SSLPinningDetector:
"""SSL Pinning检测器 - 用于识别App是否使用SSL Pinning"""
def __init__(self):
self.normal_context = ssl.create_default_context()
self.custom_context = ssl.create_default_context()
# 这里可以加载自定义CA证书用于测试
# self.custom_context.load_verify_locations('custom_ca.crt')
def test_ssl_connection(self, url: str) -> Dict[str, Any]:
"""测试SSL连接以检测Pinning"""
parsed = urlparse(url)
hostname = parsed.hostname
port = parsed.port or 443
result = {
'url': url,
'hostname': hostname,
'port': port,
'normal_connectable': False,
'custom_ca_works': False,
'pinning_detected': False,
'connection_errors': [],
'certificate_info': {}
}
try:
# 正常连接测试(使用系统默认CA)
with socket.create_connection((hostname, port), timeout=10) as sock:
with self.normal_context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
result['normal_connectable'] = True
result['certificate_info'] = {
'subject': cert.get('subject', []),
'issuer': cert.get('issuer', []),
'version': cert.get('version'),
'serialNumber': cert.get('serialNumber'),
'notBefore': cert.get('notBefore'),
'notAfter': cert.get('notAfter')
}
except Exception as e:
result['normal_connectable'] = False
result['connection_errors'].append(f'Normal connection failed: {str(e)}')
try:
# 自定义CA连接测试(如果有)
with socket.create_connection((hostname, port), timeout=10) as sock:
with self.custom_context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercult()
result['custom_ca_works'] = True
except Exception as e:
result['custom_ca_works'] = False
result['connection_errors'].append(f'Custom CA connection failed: {str(e)}')
# SSL Pinning检测逻辑
# 如果正常连接失败但使用代理/自定义CA可以连接,可能有SSL Pinning
if not result['normal_connectable'] and result['custom_ca_works']:
result['pinning_detected'] = True
elif result['normal_connectable'] and not result['custom_ca_works']:
# 这种情况不太常见,但表示自定义CA有问题
result['pinning_detected'] = False
else:
# 更复杂的检测:通过代理工具测试
result['pinning_detected'] = self._advanced_pinning_detection(url, result)
return result
def _advanced_pinning_detection(self, url: str, basic_result: Dict) -> bool:
"""高级Pinning检测 - 通过代理工具测试"""
try:
# 尝试使用代理工具连接
# 这里模拟代理工具的行为
import urllib.request
import urllib.error
# 配置代理(假设代理在本地8888端口运行)
proxy_handler = urllib.request.ProxyHandler({
'https': 'http://127.0.0.1:8888',
'http': 'http://127.0.0.1:8888'
})
opener = urllib.request.build_opener(proxy_handler)
urllib.request.install_opener(opener)
try:
req = urllib.request.Request(url, headers={'User-Agent': 'AppCrawler/1.0'})
response = urllib.request.urlopen(req, timeout=10)
proxy_success = True
proxy_status = response.getcode()
except urllib.error.URLError as e:
proxy_success = False
proxy_status = str(e)
# 如果直连失败但代理连接成功,很可能是SSL Pinning
if not basic_result['normal_connectable'] and proxy_success:
return True
elif basic_result['normal_connectable'] and proxy_success:
# 都能连接,但代理可能修改了证书
return False
else:
# 都失败,可能是网络问题或其他错误
return False
except Exception as e:
print(f"高级检测出错: {e}")
return False
def analyze_app_ssl_pinning(self, app_package: str) -> Dict[str, Any]:
"""分析App的SSL Pinning配置"""
# 这里需要结合静态分析和动态分析
analysis = {
'package_name': app_package,
'uses_ssl_pinning': False,
'pinnning_methods': [],
'detected_certificates': [],
'network_calls': [],
'security_libraries': []
}
try:
# 通过ADB获取App信息
result = subprocess.run([
'adb', 'shell', 'dumpsys', 'package', app_package
], capture_output=True, text=True, timeout=30)
if result.returncode == 0:
# 分析包信息中可能的安全配置
package_info = result.stdout
if 'network_security_config' in package_info.lower():
analysis['security_libraries'].append('Network Security Config')
# 检查是否使用了常见的安全库
common_security_libs = [
'okhttp', 'retrofit', 'httpclient', 'sslsocketfactory',
'x509trustmanager', 'hostnameverifier', 'certificatepinner'
]
for lib in common_security_libs:
if lib.lower() in package_info.lower():
analysis['security_libraries'].append(lib)
except subprocess.TimeoutExpired:
print("App分析超时")
except Exception as e:
print(f"App分析出错: {e}")
return analysis
def detect_ssl_pinning(urls: List[str]) -> List[Dict[str, Any]]:
"""批量检测SSL Pinning"""
detector = SSLPinningDetector()
results = []
print(f"🔍 开始检测 {len(urls)} 个URL的SSL Pinning...")
for i, url in enumerate(urls, 1):
print(f" 检测 [{i}/{len(urls)}]: {url}")
result = detector.test_ssl_connection(url)
results.append(result)
status = "🔒 Pinning检测到" if result['pinning_detected'] else "🔓 无Pinning"
print(f" {status} | 直连: {'✅' if result['normal_connectable'] else '❌'} | 代理: {'✅' if result['custom_ca_works'] else '❌'}")
return results
def analyze_app_pinning(app_package: str, test_urls: List[str]):
"""综合分析App的SSL Pinning"""
print(f"📱 开始分析App: {app_package}")
detector = SSLPinningDetector()
# 分析App配置
app_analysis = detector.analyze_app_ssl_pinning(app_package)
print(f"📋 App安全配置分析:")
print(f" 使用SSL Pinning: {app_analysis['uses_ssl_pinning']}")
print(f" 安全库: {app_analysis['security_libraries']}")
# 检测网络端点
url_results = detect_ssl_pinning(test_urls)
# 综合分析
pinning_detected_count = sum(1 for r in url_results if r['pinning_detected'])
print(f"\n📊 综合分析结果:")
print(f" 总URL数: {len(test_urls)}")
print(f" 检测到Pinning: {pinning_detected_count}")
print(f" 无Pinning: {len(test_urls) - pinning_detected_count}")
if pinning_detected_count > 0:
print(f"\n⚠️ 检测到SSL Pinning,需要使用Frida等工具绕过")
# 使用示例
def ssl_pinning_demo():
"""SSL Pinning检测演示"""
test_urls = [
"https://www.baidu.com",
"https://api.weixin.qq.com",
"https://graph.facebook.com",
"https://api.twitter.com"
]
results = detect_ssl_pinning(test_urls)
print(f"\n📈 检测完成,发现 {sum(1 for r in results if r['pinning_detected'])} 个可能使用SSL Pinning的URL")
if __name__ == "__main__":
ssl_pinning_demo()#4. 实践项目:抖音APP抓包分析
让我们通过一个实际项目来练习上述技能。
#抖音APP抓包分析脚本
import subprocess
import time
import json
import re
from datetime import datetime
from typing import Dict, List, Optional
import threading
import queue
import os
from pathlib import Path
class DouyinTrafficAnalyzer:
"""抖音流量分析器 - 专业的App流量监控工具"""
def __init__(self, proxy_port: int = 8888, output_dir: str = "douyin_analysis"):
self.proxy_port = proxy_port
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.intercepted_data = []
self.analysis_results = {
'session_start': datetime.now().isoformat(),
'total_requests': 0,
'api_endpoints': {},
'data_patterns': [],
'user_actions': [],
'media_requests': [],
'potential_targets': [],
'timing_analysis': {},
'security_features': []
}
self.target_domains = [
r'aweme\.snssdk\.com', # 抖音主域名
r'lf-cdn-tos\.bytegoofy\.com', # 字节CDN
r'jsb\.snssdk\.com', # 业务接口
r'hotsoon\.snssdk\.com', # 快手相关
r'misc\.s3\.amazonaws\.com', # AWS S3
r'v\.douyin\.com', # 抖音短域名
]
self.action_patterns = [
r'/aweme/v[0-9]+/feed/', # 信息流
r'/aweme/v[0-9]+/publish/', # 发布相关
r'/aweme/v[0-9]+/comment/', # 评论
r'/aweme/v[0-9]+/like/', # 点赞
r'/aweme/v[0-9]+/user/', # 用户
r'/aweme/v[0-9]+/follow/', # 关注
]
self.media_patterns = [
r'\.(mp4|avi|mov|wmv|m4v)$', # 视频
r'\.(jpg|jpeg|png|gif|bmp)$', # 图片
r'/video/objects/', # 视频对象
r'/image/objects/', # 图像对象
]
self.data_queue = queue.Queue()
self.monitoring = False
self.monitor_thread = None
print(f"🎬 抖音流量分析器初始化完成")
print(f"📡 代理端口: {self.proxy_port}")
print(f"📁 输出目录: {self.output_dir}")
def setup_environment(self) -> bool:
"""设置抓包环境"""
try:
# 检查ADB连接
result = subprocess.run(['adb', 'devices'],
capture_output=True, text=True, timeout=10)
if 'device' not in result.stdout:
print("❌ 未检测到ADB设备,请检查设备连接")
return False
# 检查代理服务是否运行
# 这里假设Fiddler或Charles已在运行
print(f"✅ ADB设备连接正常")
print(f"✅ 代理端口 {self.proxy_port} 待使用")
# 创建输出目录结构
(self.output_dir / "raw_data").mkdir(exist_ok=True)
(self.output_dir / "analysis").mkdir(exist_ok=True)
(self.output_dir / "reports").mkdir(exist_ok=True)
return True
except subprocess.TimeoutExpired:
print("❌ ADB命令执行超时")
return False
except Exception as e:
print(f"❌ 环境设置失败: {e}")
return False
def start_traffic_capture(self, duration: int = 300, app_package: str = "com.ss.android.ugc.aweme") -> List[Dict]:
"""开始流量捕获"""
print(f"🚀 开始捕获抖音流量,持续 {duration} 秒...")
print(f"📱 目标App: {app_package}")
# 启动App
self._launch_app(app_package)
time.sleep(3) # 等待App启动
self.monitoring = True
start_time = time.time()
# 模拟流量捕获(实际使用时需要配合代理工具)
print("📡 开始监控网络流量...")
# 这里应该连接到实际的代理工具来实时接收数据
# 由于无法直接集成代理工具,我们使用模拟数据
self._start_monitoring_thread()
try:
while time.time() - start_time < duration and self.monitoring:
# 检查是否有新数据
try:
data = self.data_queue.get(timeout=1)
self._process_incoming_data(data)
except queue.Empty:
continue
except KeyboardInterrupt:
print("\n⏸️ 用户中断捕获")
finally:
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join(timeout=5)
print(f"✅ 流量捕获完成,共处理 {self.analysis_results['total_requests']} 个请求")
return self.intercepted_data
def _launch_app(self, package_name: str):
"""启动App"""
try:
subprocess.run([
'adb', 'shell',
'monkey', '-p', package_name,
'-c', 'android.intent.category.LAUNCHER', '1'
], check=True, capture_output=True)
print(f"✅ App启动命令已发送: {package_name}")
except Exception as e:
print(f"❌ App启动失败: {e}")
def _start_monitoring_thread(self):
"""启动监控线程 - 模拟数据接收"""
def monitoring_worker():
counter = 0
while self.monitoring:
# 模拟接收网络数据包
if counter % 10 == 0: # 每10秒生成一些模拟数据
simulated_data = self._generate_simulated_traffic()
for data in simulated_data:
if self.monitoring: # 检查是否仍在监控
self.data_queue.put(data)
time.sleep(1)
counter += 1
self.monitor_thread = threading.Thread(target=monitoring_worker, daemon=True)
self.monitor_thread.start()
def _generate_simulated_traffic(self) -> List[Dict]:
"""生成模拟的抖音流量数据"""
import random
import string
simulated_requests = []
# 生成一些典型的抖音API请求
base_domains = [
"aweme.snssdk.com", "lf-cdn-tos.bytegoofy.com",
"jsb.snssdk.com", "v.douyin.com"
]
api_endpoints = [
"/aweme/v1/feed/", "/aweme/v2/comment/list/",
"/aweme/v1/like/", "/aweme/v1/user/info/",
"/aweme/v2/video/urls/", "/aweme/v1/follow/"
]
for _ in range(random.randint(3, 8)): # 每次生成3-8个请求
domain = random.choice(base_domains)
endpoint = random.choice(api_endpoints)
request_data = {
'timestamp': time.time(),
'method': random.choice(['GET', 'POST']),
'url': f"https://{domain}{endpoint}",
'headers': {
'User-Agent': 'Aweme/10.6.0 rv:106000 (iPhone; iOS 14.4; zh_CN) Cronet/TTNetVersion:8c82810a',
'X-SS-REQ-TICKET': str(int(time.time() * 1000)),
'X-Khronos': str(int(time.time())),
'Cookie': f'session_id={"".join(random.choices(string.ascii_letters + string.digits, k=32))}'
},
'params': {
'device_platform': 'iphone',
'device_type': 'iPhone12,1',
'
