#JS逆向与算法分析
#课程目标
- 掌握混合App的抓包技术
- 学会通过Chrome远程调试分析H5页面
- 掌握JS逆向分析技巧和动态调试方法
- 学会使用Frida-RPC调用App内部算法
- 了解复杂加密算法的还原方法
#1. 混合应用(Hybrid App)抓包
混合应用结合了原生应用和Web技术,其中一部分功能通过WebView组件加载H5页面实现。这类应用的抓包和分析需要特殊的方法。
#1.1 Hybrid App架构分析
/**
* 混合应用典型架构
*
* Native Layer (原生层)
* ├── Activity/ViewController
* ├── WebView Container
* ├── Native Modules
* └── Bridge Interface
*
* Web Layer (Web层)
* ├── HTML/CSS/JS
* ├── Cordova/PhoneGap Plugins
* ├── WebView JavaScript Bridge
* └── Network Requests
*/
// 常见的Hybrid框架
const HYBRID_FRAMEWORKS = {
CORDOVA: {
name: "Apache Cordova",
description: "最老牌的Hybrid框架",
bridge: "cordova.exec()",
plugins: ["camera", "geolocation", "device", "file"]
},
IONIC: {
name: "Ionic",
description: "基于Angular的Hybrid框架",
basedOn: "Cordova + Angular",
components: ["ionic-angular", "ionic-native"]
},
WEEX: {
name: "Weex",
description: "阿里巴巴开源的跨平台框架",
syntax: "Vue.js语法",
platforms: ["iOS", "Android", "Web"]
},
RN_WEBVIEW: {
name: "React Native WebView",
description: "RN中的WebView组件",
bridge: "postMessage + addEventListener"
}
};
// WebView通信桥接示例
class HybridBridge {
constructor() {
this.callbacks = {};
this.callbackId = 0;
}
// Native调用JS
callJS(method, params, callback) {
const id = ++this.callbackId;
if (callback) {
this.callbacks[id] = callback;
}
// 通过prompt或自定义协议调用Native
const message = {
id: id,
method: method,
params: params
};
// Android
if (window.JsBridge) {
return window.JsBridge.call(JSON.stringify(message));
}
// iOS
if (window.webkit && window.webkit.messageHandlers) {
window.webkit.messageHandlers.native.postMessage(message);
}
// 降级方案 - URL Scheme
const url = `jsbridge://${method}?params=${encodeURIComponent(JSON.stringify(params))}`;
window.location.href = url;
}
// JS调用Native
callNative(method, params, callback) {
return this.callJS(method, params, callback);
}
// 接收Native回调
handleCallback(id, result) {
const callback = this.callbacks[id];
if (callback) {
callback(result);
delete this.callbacks[id];
}
}
}
// 使用示例
const bridge = new HybridBridge();
// 调用原生摄像头
bridge.callNative('takePhoto', { quality: 80 }, (result) => {
console.log('照片结果:', result);
});
// 获取设备信息
bridge.callNative('getDeviceInfo', {}, (deviceInfo) => {
console.log('设备信息:', deviceInfo);
});#1.2 Chrome远程调试设置
import subprocess
import time
import json
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
class ChromeRemoteDebugger:
"""Chrome远程调试器"""
def __init__(self, device_serial=None):
self.device_serial = device_serial
self.driver = None
self.setup_chrome_debugging()
def setup_chrome_debugging(self):
"""设置Chrome远程调试"""
# 启用USB调试
if self.device_serial:
subprocess.run(['adb', '-s', self.device_serial, 'forward', 'tcp:9222', 'localabstract:chrome_devtools_remote'])
else:
subprocess.run(['adb', 'forward', 'tcp:9222', 'localabstract:chrome_devtools_remote'])
def connect_to_webview(self, package_name, webview_activity=None):
"""连接到WebView"""
try:
# 设置Chrome选项
chrome_options = Options()
chrome_options.add_experimental_option('androidPackage', package_name)
if webview_activity:
chrome_options.add_experimental_option('androidActivity', webview_activity)
# 连接到远程调试
self.driver = webdriver.Chrome(
options=chrome_options,
desired_capabilities=DesiredCapabilities.CHROME
)
print(f"成功连接到 {package_name} 的WebView")
return True
except Exception as e:
print(f"连接WebView失败: {e}")
return False
def get_webview_pages(self):
"""获取WebView页面列表"""
if not self.driver:
return []
try:
# 获取所有页面
pages = self.driver.execute_script("""
return window.chrome.webview.getAllWebviews();
""")
return pages
except:
# 备用方法
return [self.driver.current_url]
def inject_javascript(self, script):
"""注入JavaScript代码"""
if not self.driver:
return None
try:
result = self.driver.execute_script(script)
return result
except Exception as e:
print(f"JavaScript执行失败: {e}")
return None
def intercept_network_requests(self):
"""拦截网络请求"""
# 使用Chrome DevTools Protocol
self.driver.execute_cdp_cmd('Network.enable', {})
# 监听请求事件
self.driver.execute_cdp_cmd('Network.setRequestInterception', {
'patterns': [{'urlPattern': '*'}]
})
def monitor_console_logs(self):
"""监控控制台日志"""
logs = self.driver.get_log('browser')
return logs
def take_screenshot(self, filename):
"""截取WebView截图"""
return self.driver.save_screenshot(filename)
def close(self):
"""关闭连接"""
if self.driver:
self.driver.quit()
# 高级调试功能
class AdvancedHybridDebugger(ChromeRemoteDebugger):
"""高级Hybrid调试器"""
def __init__(self, device_serial=None):
super().__init__(device_serial)
self.network_monitor = NetworkMonitor()
self.js_interceptor = JSInterceptor()
def setup_advanced_debugging(self):
"""设置高级调试功能"""
# 启用网络监控
self.driver.execute_cdp_cmd('Network.enable', {})
# 启用性能监控
self.driver.execute_cdp_cmd('Performance.enable', {})
# 启用Console监控
self.driver.execute_cdp_cmd('Console.enable', {})
# 设置断点
self.driver.execute_cdp_cmd('Debugger.enable', {})
def hook_ajax_requests(self):
"""Hook AJAX请求"""
ajax_hook_script = """
(function() {
// 保存原始的XMLHttpRequest
var originalXHR = window.XMLHttpRequest;
// 创建新的XMLHttpRequest构造函数
window.XMLHttpRequest = function() {
var xhr = new originalXHR();
// Hook send方法
var originalSend = xhr.send;
xhr.send = function(data) {
console.log('AJAX Request:', {
method: xhr._method,
url: xhr._url,
data: data
});
// 发送前回调
if (window.onAjaxRequest) {
window.onAjaxRequest({
method: xhr._method,
url: xhr._url,
data: data,
xhr: xhr
});
}
return originalSend.call(this, data);
};
// Hook open方法来获取URL和方法
var originalOpen = xhr.open;
xhr.open = function(method, url) {
xhr._method = method;
xhr._url = url;
return originalOpen.apply(this, arguments);
};
// Hook onreadystatechange来捕获响应
var originalOnReadyStateChange = xhr.onreadystatechange;
xhr.onreadystatechange = function() {
if (xhr.readyState === 4) { // 请求完成
console.log('AJAX Response:', {
status: xhr.status,
response: xhr.responseText,
url: xhr._url
});
// 响应后回调
if (window.onAjaxResponse) {
window.onAjaxResponse({
status: xhr.status,
response: xhr.responseText,
url: xhr._url,
xhr: xhr
});
}
}
if (originalOnReadyStateChange) {
originalOnReadyStateChange.apply(this, arguments);
}
};
return xhr;
};
// Hook fetch API
var originalFetch = window.fetch;
window.fetch = function(input, init) {
var url = typeof input === 'string' ? input : input.url;
var method = init && init.method || (typeof input === 'object' && input.method) || 'GET';
var body = init && init.body || (typeof input === 'object' && input.body) || null;
console.log('Fetch Request:', {
method: method,
url: url,
body: body
});
// 请求前回调
if (window.onFetchRequest) {
window.onFetchRequest({
method: method,
url: url,
body: body
});
}
return originalFetch(input, init).then(function(response) {
// 克隆响应以读取内容
var clonedResponse = response.clone();
clonedResponse.text().then(function(text) {
console.log('Fetch Response:', {
status: response.status,
url: response.url,
response: text
});
// 响应后回调
if (window.onFetchResponse) {
window.onFetchResponse({
status: response.status,
url: response.url,
response: text,
originalResponse: response
});
}
}).catch(function(error) {
console.error('读取fetch响应失败:', error);
});
return response;
});
};
console.log('AJAX/Fetch hooks installed');
})();
"""
self.inject_javascript(ajax_hook_script)
def hook_crypto_functions(self):
"""Hook加密函数"""
crypto_hook_script = """
(function() {
// Hook Crypto.subtle
if (window.crypto && window.crypto.subtle) {
var subtle = window.crypto.subtle;
// Hook digest
var originalDigest = subtle.digest;
subtle.digest = function(algorithm, data) {
console.log('Crypto.digest called:', {
algorithm: algorithm,
dataLength: data.byteLength
});
var promise = originalDigest.call(this, algorithm, data);
return promise.then(function(result) {
console.log('Crypto.digest result:', {
resultLength: result.byteLength
});
return result;
});
};
// Hook encrypt
var originalEncrypt = subtle.encrypt;
subtle.encrypt = function(algorithm, key, data) {
console.log('Crypto.encrypt called:', {
algorithm: algorithm,
key: key,
dataLength: data.byteLength
});
var promise = originalEncrypt.call(this, algorithm, key, data);
return promise.then(function(result) {
console.log('Crypto.encrypt result:', {
resultLength: result.byteLength
});
return result;
});
};
// Hook decrypt
var originalDecrypt = subtle.decrypt;
subtle.decrypt = function(algorithm, key, data) {
console.log('Crypto.decrypt called:', {
algorithm: algorithm,
key: key,
dataLength: data.byteLength
});
var promise = originalDecrypt.call(this, algorithm, key, data);
return promise.then(function(result) {
console.log('Crypto.decrypt result:', {
resultLength: result.byteLength
});
return result;
});
};
}
// Hook WebAssembly
var originalInstantiate = window.WebAssembly.instantiate;
window.WebAssembly.instantiate = function(bytes, importObject) {
console.log('WebAssembly.instantiate called:', {
bytesLength: bytes.length
});
return originalInstantiate.call(this, bytes, importObject).then(function(result) {
console.log('WebAssembly instantiated:', {
exports: Object.keys(result.instance.exports)
});
return result;
});
};
console.log('Crypto/WebAssembly hooks installed');
})();
"""
self.inject_javascript(crypto_hook_script)
class NetworkMonitor:
"""网络监控器"""
def __init__(self):
self.requests = []
self.responses = []
def start_monitoring(self, driver):
"""开始监控网络"""
driver.execute_cdp_cmd('Network.enable', {})
# 监听请求
def request_will_be_sent(**kwargs):
self.requests.append({
'requestId': kwargs.get('requestId'),
'url': kwargs.get('request').get('url'),
'method': kwargs.get('request').get('method'),
'timestamp': kwargs.get('timestamp'),
'headers': kwargs.get('request').get('headers')
})
# 监听响应
def response_received(**kwargs):
self.responses.append({
'requestId': kwargs.get('requestId'),
'status': kwargs.get('response').get('status'),
'url': kwargs.get('response').get('url'),
'timestamp': kwargs.get('timestamp'),
'headers': kwargs.get('response').get('headers')
})
def get_monitored_data(self):
"""获取监控数据"""
return {
'requests': self.requests,
'responses': self.responses
}
class JSInterceptor:
"""JavaScript拦截器"""
def __init__(self):
self.intercepted_calls = []
def intercept_function_calls(self, function_name):
"""拦截函数调用"""
script = f"""
(function() {{
var originalFunction = window.{function_name};
if (typeof originalFunction === 'function') {{
window.{function_name} = function() {{
var args = Array.prototype.slice.call(arguments);
console.log('Function call intercepted:', {{
function: '{function_name}',
arguments: args
}});
// 记录调用
if (!window.jsInterceptedCalls) {{
window.jsInterceptedCalls = [];
}}
window.jsInterceptedCalls.push({{
function: '{function_name}',
arguments: args,
timestamp: Date.now()
}});
return originalFunction.apply(this, arguments);
}};
}}
}})();
"""
return script
# 使用示例
def hybrid_debugging_example():
"""Hybrid调试示例"""
debugger = AdvancedHybridDebugger()
# 连接到目标应用
if debugger.connect_to_webview("com.example.hybridapp"):
# 设置高级调试功能
debugger.setup_advanced_debugging()
# Hook网络请求
debugger.hook_ajax_requests()
# Hook加密函数
debugger.hook_crypto_functions()
# 注入自定义监控代码
monitoring_script = """
window.onAjaxRequest = function(req) {
console.log('Monitoring AJAX Request:', req);
// 这里可以将请求数据发送到外部监控系统
};
window.onAjaxResponse = function(resp) {
console.log('Monitoring AJAX Response:', resp);
// 这里可以将响应数据发送到外部监控系统
};
"""
debugger.inject_javascript(monitoring_script)
print("Hybrid应用调试已设置完成")
# 保持连接
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("退出调试")
debugger.close()
if __name__ == "__main__":
hybrid_debugging_example()#1.3 WebView调试技巧
class WebViewDebugger:
"""WebView调试工具"""
@staticmethod
def enable_webview_debugging():
"""启用WebView调试(需要开发者在代码中设置)"""
# Android代码示例
WEBVIEW_DEBUG_CODE_ANDROID = """
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.KITKAT) {
WebView.setWebContentsDebuggingEnabled(true);
}
"""
# iOS代码示例
WEBVIEW_DEBUG_CODE_IOS = """
#ifdef DEBUG
[[NSUserDefaults standardUserDefaults] setBool:YES forKey:@"WebKitDeveloperExtras"];
#endif
"""
print("WebView调试代码示例:")
print("Android:", WEBVIEW_DEBUG_CODE_ANDROID)
print("iOS:", WEBVIEW_DEBUG_CODE_IOS)
@staticmethod
def find_webview_processes():
"""查找WebView进程"""
import subprocess
# 查找Android设备上的WebView进程
try:
result = subprocess.run(['adb', 'shell', 'ps'],
capture_output=True, text=True)
processes = result.stdout
webview_processes = []
for line in processes.split('\n'):
if 'webview' in line.lower() or 'chrome' in line.lower():
webview_processes.append(line.strip())
return webview_processes
except Exception as e:
print(f"查找WebView进程失败: {e}")
return []
@staticmethod
def get_webview_inspector_url(package_name):
"""获取WebView Inspector URL"""
# 通常的Inspector URL格式
inspector_url = f"http://localhost:9222/devtools/inspector.html?ws=localhost:9222/devtools/page/{package_name}"
return inspector_url
@staticmethod
def dump_webview_html(driver):
"""获取WebView的HTML内容"""
try:
# 获取完整的HTML
html = driver.execute_script("return document.documentElement.outerHTML;")
return html
except Exception as e:
print(f"获取HTML失败: {e}")
return None
@staticmethod
def monitor_js_errors(driver):
"""监控JavaScript错误"""
try:
# 获取页面错误
errors = driver.get_log('browser')
js_errors = [log for log in errors if log['level'] in ['SEVERE', 'WARNING']]
return js_errors
except Exception as e:
print(f"监控JS错误失败: {e}")
return []
# Cordova/PhoneGap特定调试
class CordovaDebugger:
"""Cordova调试工具"""
def __init__(self, driver):
self.driver = driver
def get_cordova_plugins(self):
"""获取Cordova插件信息"""
plugin_info_script = """
var plugins = [];
if (window.cordova && window.cordova.plugins) {
for (var pluginName in window.cordova.plugins) {
plugins.push({
name: pluginName,
methods: Object.getOwnPropertyNames(window.cordova.plugins[pluginName]).filter(function(prop) {
return typeof window.cordova.plugins[pluginName][prop] === 'function';
})
});
}
}
return plugins;
"""
try:
plugins = self.driver.execute_script(plugin_info_script)
return plugins
except Exception as e:
print(f"获取Cordova插件信息失败: {e}")
return []
def hook_cordova_exec(self):
"""Hook cordova.exec调用"""
hook_script = """
if (window.cordova && window.cordova.exec) {
var originalExec = window.cordova.exec;
window.cordova.exec = function(successCallback, errorCallback, service, action, args) {
console.log('Cordova exec called:', {
service: service,
action: action,
args: args
});
// 记录调用
if (!window.cordovaCalls) {
window.cordovaCalls = [];
}
window.cordovaCalls.push({
service: service,
action: action,
args: args,
timestamp: Date.now()
});
// 执行原始调用
return originalExec.call(this, successCallback, errorCallback, service, action, args);
};
}
"""
self.driver.execute_script(hook_script)
def get_device_info(self):
"""获取设备信息"""
device_info_script = """
if (window.device) {
return {
model: window.device.model,
platform: window.device.platform,
version: window.device.version,
uuid: window.device.uuid,
manufacturer: window.device.manufacturer,
isVirtual: window.device.isVirtual,
serial: window.device.serial
};
}
return null;
"""
try:
device_info = self.driver.execute_script(device_info_script)
return device_info
except Exception as e:
print(f"获取设备信息失败: {e}")
return None
def cordova_debugging_example():
"""Cordova调试示例"""
# 假设已经连接到WebView
# driver = connect_to_webview("com.example.cordovaapp")
# cordova_debugger = CordovaDebugger(driver)
# # 获取插件信息
# plugins = cordova_debugger.get_cordova_plugins()
# print(f"Cordova插件: {plugins}")
# # Hook exec调用
# cordova_debugger.hook_cordova_exec()
# # 获取设备信息
# device_info = cordova_debugger.get_device_info()
# print(f"设备信息: {device_info}")
print("Cordova调试示例代码已准备")
# Ionic应用调试
class IonicDebugger:
"""Ionic应用调试工具"""
def __init__(self, driver):
self.driver = driver
def get_ionic_components(self):
"""获取Ionic组件信息"""
component_info_script = """
var components = [];
var elements = document.querySelectorAll('[class*="ion-"], [class*="toolbar"], [class*="header"]');
for (var i = 0; i < elements.length; i++) {
var element = elements[i];
var classes = element.className.split(' ');
var ionClasses = classes.filter(function(cls) {
return cls.startsWith('ion-');
});
components.push({
tagName: element.tagName.toLowerCase(),
classes: ionClasses,
attributes: Array.from(element.attributes).reduce(function(acc, attr) {
acc[attr.name] = attr.value;
return acc;
}, {}),
text: element.textContent.trim().substring(0, 50)
});
}
return components;
"""
try:
components = self.driver.execute_script(component_info_script)
return components
except Exception as e:
print(f"获取Ionic组件信息失败: {e}")
return []
def get_angular_state(self):
"""获取Angular应用状态"""
angular_state_script = """
if (window.getAllAngularTestability) {
var rootElements = window.getAllAngularRootElements();
var states = [];
for (var i = 0; i < rootElements.length; i++) {
var rootElement = rootElements[i];
var testability = window.getAllAngularTestability()[i];
states.push({
element: rootElement.tagName,
stability: testability.isStable()
});
}
return states;
}
return null;
"""
try:
state = self.driver.execute_script(angular_state_script)
return state
except Exception as e:
print(f"获取Angular状态失败: {e}")
return None
def ionic_debugging_example():
"""Ionic调试示例"""
print("Ionic调试功能已准备")
# 类似Cordova调试的实现#2. 算法定位技巧
#2.1 JS混淆代码分析
import re
import json
from typing import List, Dict, Tuple
import ast
import jsbeautifier
class JSSymbolMapper:
"""JS符号映射器 - 用于反混淆"""
def __init__(self):
self.symbol_map = {}
self.function_map = {}
self.variable_map = {}
self.string_map = {}
def analyze_obfuscated_code(self, js_code: str) -> Dict:
"""分析混淆代码"""
analysis = {
'original_to_readable': {},
'functions': [],
'variables': [],
'strings': [],
'patterns': []
}
# 查找函数定义
function_pattern = r'function\s+(\w+)\s*\([^)]*\)|(\w+)\s*=\s*function\s*\(|(\w+)\s*=>'
functions = re.findall(function_pattern, js_code)
for match in functions:
func_name = [name for name in match if name][0] # 获取非空的函数名
if len(func_name) <= 3: # 可能是混淆的短名称
readable_name = self.guess_function_name(js_code, func_name)
if readable_name:
analysis['original_to_readable'][func_name] = readable_name
# 查找变量赋值
variable_pattern = r'var\s+(\w+)\s*=|let\s+(\w+)\s*=|const\s+(\w+)\s*='
variables = re.findall(variable_pattern, js_code)
for match in variables:
var_name = [name for name in match if name][0]
if len(var_name) <= 3: # 可能是混淆的短名称
readable_name = self.guess_variable_name(js_code, var_name)
if readable_name:
analysis['original_to_readable'][var_name] = readable_name
# 查找字符串
string_pattern = r'["\']([^"\']{5,100})["\']'
strings = re.findall(string_pattern, js_code)
for string_val in strings:
if self.is_meaningful_string(string_val):
analysis['strings'].append(string_val)
return analysis
def guess_function_name(self, js_code: str, obfuscated_name: str) -> str:
"""猜测函数的真实名称"""
# 查找函数的使用上下文
context_pattern = rf'[^a-zA-Z0-9_]{obfuscated_name}\s*\('
context_matches = re.findall(context_pattern, js_code)
# 基于上下文猜测
if 'encrypt' in js_code.lower() or 'sign' in js_code.lower():
return 'encryptFunction'
elif 'decrypt' in js_code.lower():
return 'decryptFunction'
elif 'hash' in js_code.lower() or 'digest' in js_code.lower():
return 'hashFunction'
elif 'request' in js_code.lower() or 'api' in js_code.lower():
return 'makeRequest'
elif 'token' in js_code.lower():
return 'generateToken'
return f'function_{obfuscated_name}'
def guess_variable_name(self, js_code: str, obfuscated_name: str) -> str:
"""猜测变量的真实名称"""
# 查找变量的赋值和使用上下文
assignment_pattern = rf'{obfuscated_name}\s*=\s*([^;]+)'
assignments = re.findall(assignment_pattern, js_code)
for assignment in assignments:
if 'api' in assignment.lower():
return 'apiUrl'
elif 'key' in assignment.lower():
return 'encryptionKey'
elif 'secret' in assignment.lower():
return 'secretKey'
elif 'timestamp' in assignment.lower():
return 'timestamp'
elif 'nonce' in assignment.lower():
return 'nonceValue'
return f'variable_{obfuscated_name}'
def is_meaningful_string(self, string_val: str) -> bool:
"""判断字符串是否有意义"""
# 排除纯随机字符串
if re.match(r'^[a-zA-Z0-9]{10,}$', string_val):
return False # 看起来像随机字符串
# 包含字母和数字,可能是有意义的
if re.search(r'[a-zA-Z]', string_val) and re.search(r'\d', string_val):
return True
# API路径
if '/' in string_val and len(string_val) > 3:
return True
# 常见的参数名
common_params = ['sign', 'token', 'timestamp', 'nonce', 'data', 'method']
if string_val.lower() in common_params:
return True
return len(string_val) > 5 # 长度大于5的字符串可能有意义
def deobfuscate_code(self, js_code: str) -> str:
"""反混淆代码"""
analysis = self.analyze_obfuscated_code(js_code)
deobfuscated_code = js_code
# 替换函数名和变量名
for original, readable in analysis['original_to_readable'].items():
# 使用单词边界确保完整匹配
pattern = r'\b' + re.escape(original) + r'\b'
deobfuscated_code = re.sub(pattern, readable, deobfuscated_code)
return deobfuscated_code
class AlgorithmFinder:
"""算法定位器"""
def __init__(self):
self.js_mapper = JSSymbolMapper()
self.encryption_patterns = self.define_encryption_patterns()
self.signature_patterns = self.define_signature_patterns()
def define_encryption_patterns(self) -> List[Dict]:
"""定义加密算法模式"""
return [
# AES加密模式
{
'name': 'AES Encryption',
'patterns': [
r'CryptoJS\.AES\.encrypt|crypto\.subtle\.encrypt',
r'cipher\.update|cipher\.final',
r'SecretKeySpec|javax\.crypto',
r'rijndael|aes-\d{3}-cbc'
],
'indicators': ['AES', 'Rijndael', 'cipher', 'encrypt', 'decrypt']
},
# RSA加密模式
{
'name': 'RSA Encryption',
'patterns': [
r'JSEncrypt|window\.JSEncrypt',
r'forge\.rsa|window\.forge\.rsa',
r'crypto\.subtle\.importKey.*rsa'
],
'indicators': ['RSA', 'publicKey', 'privateKey', 'encryptLong', 'decryptLong']
},
# Hash算法模式
{
'name': 'Hash Algorithm',
'patterns': [
r'CryptoJS\.(MD5|SHA|RIPEMD)',
r'crypto\.subtle\.(digest|hash)',
r'MessageDigest\.getInstance'
],
'indicators': ['MD5', 'SHA', 'SHA1', 'SHA256', 'SHA512', 'hash', 'digest']
},
# Base64编码
{
'name': 'Base64 Encoding',
'patterns': [
r'btoa|atob',
r'Base64\.encode|Base64\.decode',
r'window\.btoa|window\.atob'
],
'indicators': ['Base64', 'btoa', 'atob', 'encode', 'decode']
}
]
def define_signature_patterns(self) -> List[Dict]:
"""定义签名算法模式"""
return [
# 常见签名函数名
{
'name': 'Signature Generation',
'patterns': [
r'generateSign|calcSign|makeSign',
r'getSignature|createSignature',
r'signData|signParams'
],
'indicators': ['sign', 'signature', 'signData', 'calc']
},
# 参数拼接模式
{
'name': 'Parameter Concatenation',
'patterns': [
r'Object\.keys\(\w+\)\.sort\(\)',
r'for.*in.*\+?=.*\+',
r'params\.join|str\.concat',
r'sort.*concatenate|buildString'
],
'indicators': ['sort', 'concat', 'join', 'build', 'stringify']
},
# 时间戳处理
{
'name': 'Timestamp Processing',
'patterns': [
r'Date\.now|new Date|timestamp',
r'Math\.floor.*getTime',
r'parseInt.*Date'
],
'indicators': ['timestamp', 'time', 'date', 'now']
}
]
def search_algorithm_functions(self, js_code: str) -> List[Dict]:
"""搜索算法相关函数"""
results = []
# 搜索加密算法
for pattern_group in self.encryption_patterns:
for pattern in pattern_group['patterns']:
matches = re.finditer(pattern, js_code, re.IGNORECASE)
for match in matches:
results.append({
'type': 'encryption',
'algorithm': pattern_group['name'],
'match': match.group(),
'position': match.span(),
'context': self.get_context(js_code, match.start(), 100)
})
# 搜索签名算法
for pattern_group in self.signature_patterns:
for pattern in pattern_group['patterns']:
matches = re.finditer(pattern, js_code, re.IGNORECASE)
for match in matches:
results.append({
'type': 'signature',
'algorithm': pattern_group['name'],
'match': match.group(),
'position': match.span(),
'context': self.get_context(js_code, match.start(), 100)
})
return results
def get_context(self, js_code: str, position: int, context_length: int = 100) -> str:
"""获取匹配的上下文"""
start = max(0, position - context_length)
end = min(len(js_code), position + context_length)
return js_code[start:end]
def find_signature_logic(self, js_code: str) -> List[Dict]:
"""查找签名逻辑"""
signature_findings = []
# 查找可能的签名函数
function_pattern = r'function\s+(\w+)[^{]*\{(?:[^{}]|{[^{}]*})*\}'
functions = re.findall(function_pattern, js_code, re.DOTALL)
for func_match in re.finditer(function_pattern, js_code, re.DOTALL):
func_name = re.search(r'function\s+(\w+)', func_match.group()).group(1)
func_body = func_match.group()
# 检查函数是否包含签名相关逻辑
if self.contains_signature_logic(func_body):
signature_findings.append({
'function_name': func_name,
'function_body': func_body[:500] + '...' if len(func_body) > 500 else func_body,
'parameters': self.extract_parameters(func_body),
'variables': self.extract_variables(func_body),
'strings': self.extract_strings(func_body)
})
return signature_findings
def contains_signature_logic(self, func_body: str) -> bool:
"""检查函数是否包含签名逻辑"""
signature_indicators = [
'sign', 'signature', 'token', 'auth', 'key', 'secret',
'timestamp', 'nonce', 'encrypt', 'hash', 'digest',
'sort', 'concat', 'join', 'toUpperCase', 'toLowerCase'
]
body_lower = func_body.lower()
indicator_count = sum(1 for indicator in signature_indicators if indicator in body_lower)
return indicator_count >= 2 # 至少包含2个指标
def extract_parameters(self, func_body: str) -> List[str]:
"""提取函数参数"""
param_pattern = r'function\s+\w+\s*\(\s*([^)]*)\s*\)'
match = re.search(param_pattern, func_body)
if match:
params_str = match.group(1)
params = [p.strip() for p in params_str.split(',') if p.strip()]
return params
return []
def extract_variables(self, func_body: str) -> List[str]:
"""提取函数内的变量"""
var_pattern = r'(?:var|let|const)\s+(\w+)'
variables = re.findall(var_pattern, func_body)
return variables
def extract_strings(self, func_body: str) -> List[str]:
"""提取函数内的字符串常量"""
string_pattern = r'["\']([^"\']{3,50})["\']'
strings = re.findall(string_pattern, func_body)
meaningful_strings = [s for s in strings if not re.match(r'^[a-zA-Z0-9]{10,}$', s)]
return meaningful_strings
def locate_encryption_key(self, js_code: str) -> List[Dict]:
"""定位加密密钥"""
key_findings = []
# 搜索密钥定义
key_patterns = [
r'(?:key|secret|password|token)\s*[=:]\s*["\']([^"\']{8,64})["\']',
r'["\']([A-Fa-f0-9]{16,64})["\']', # 十六进制密钥
r'(?:aes|des|rsa).*?["\']([^"\']{8,64})["\']', # 加密相关字符串
]
for pattern in key_patterns:
matches = re.finditer(pattern, js_code)
for match in matches:
key_value = match.group(1)
# 验证是否真的是密钥(排除误报)
if self.is_valid_key(key_value):
key_findings.append({
'key': key_value,
'position': match.span(),
'context': self.get_context(js_code, match.start(), 50)
})
return key_findings
def is_valid_key(self, key_value: str) -> bool:
"""验证是否是有效的密钥"""
# 排除明显的非密钥字符串
excluded_patterns = [
r'^\d+$', # 纯数字
r'^(?:true|false|null|undefined)$', # JS关键字
r'^[a-z]{3,8}$', # 短单词
]
for pattern in excluded_patterns:
if re.match(pattern, key_value, re.IGNORECASE):
return False
# 检查是否包含足够的随机性
if len(set(key_value)) / len(key_value) < 0.3: # 字符重复度过高
return False
return True
# 使用示例
def algorithm_search_example():
"""算法搜索示例"""
sample_js_code = """
function calcSign(params) {
var sortedKeys = Object.keys(params).sort();
var signStr = '';
for (var i = 0; i < sortedKeys.length; i++) {
var key = sortedKeys[i];
if (key !== 'sign') {
signStr += key + '=' + params[key] + '&';
}
}
signStr = signStr.substring(0, signStr.length - 1);
var secret = 'mySecretKey123';
var timestamp = Date.now();
var nonce = Math.random().toString(36).substr(2, 9);
var hashInput = signStr + '&' + 'key=' + secret + '×tamp=' + timestamp + '&nonce=' + nonce;
var hash = CryptoJS.MD5(hashInput).toString();
return hash.toUpperCase();
}
function encryptData(data) {
var key = CryptoJS.enc.Utf8.parse('1234567890123456');
var iv = CryptoJS.enc.Utf8.parse('1234567890123456');
var encrypted = CryptoJS.AES.encrypt(data, key, {
iv: iv,
mode: CryptoJS.mode.CBC,
padding: CryptoJS.pad.Pkcs7
});
return encrypted.toString();
}
"""
finder = AlgorithmFinder()
# 搜索算法函数
algorithm_functions = finder.search_algorithm_functions(sample_js_code)
print("发现的算法函数:")
for func in algorithm_functions:
print(f" - {func['type']}: {func['algorithm']} at {func['position']}")
# 查找签名逻辑
signature_logic = finder.find_signature_logic(sample_js_code)
print("\\n发现的签名逻辑:")
for logic in signature_logic:
print(f" - 函数: {logic['function_name']}")
print(f" - 参数: {logic['parameters']}")
print(f" - 变量: {logic['variables']}")
# 定位加密密钥
encryption_keys = finder.locate_encryption_key(sample_js_code)
print("\\n发现的加密密钥:")
for key in encryption_keys:
print(f" - 密钥: {key['key'][:10]}...")
print(f" - 位置: {key['position']}")
if __name__ == "__main__":
algorithm_search_example()#2.2 动态调试技巧
import time
import threading
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.chrome.options import Options
class JSDynamicDebugger:
"""JS动态调试器"""
def __init__(self, driver):
self.driver = driver
self.breakpoints = []
self.watch_variables = {}
self.execution_log = []
def set_breakpoint(self, url_pattern: str, line_number: int, condition: str = None):
"""设置断点"""
breakpoint_info = {
'url_pattern': url_pattern,
'line_number': line_number,
'condition': condition,
'hit_count': 0,
'enabled': True
}
self.breakpoints.append(breakpoint_info)
print(f"断点设置: {url_pattern}#{line_number}")
def watch_variable(self, variable_name: str, callback=None):
"""监视变量"""
self.watch_variables[variable_name] = {
'callback': callback,
'previous_value': None,
'change_count': 0
}
print(f"开始监视变量: {variable_name}")
def inject_debugger_script(self):
"""注入调试脚本"""
debugger_script = """
(function() {
if (window.JSDebuggerInjected) {
return;
}
window.JSDebuggerInjected = true;
// 重写eval函数以监控代码执行
var originalEval = window.eval;
window.eval = function(code) {
console.log('Eval called:', code.substring(0, 100) + (code.length > 100 ? '...' : ''));
if (window.onEvalCalled) {
window.onEvalCalled(code);
}
return originalEval.call(this, code);
};
// 重写Function构造函数
var originalFunction = window.Function;
window.Function = function() {
var args = Array.prototype.slice.call(arguments);
var functionBody = args[args.length - 1] || '';
var functionParams = args.slice(0, args.length - 1);
console.log('Function constructor called:', {
params: functionParams,
body: functionBody.substring(0, 100) + (functionBody.length > 100 ? '...' : '')
});
if (window.onFunctionCreated) {
window.onFunctionCreated({
params: functionParams,
body: functionBody
});
}
return originalFunction.apply(this, arguments);
};
// 变量监视器
window.variableWatchers = {};
window.defineProperty = function(obj, prop, descriptor) {
var originalValue = descriptor.value;
// 创建getter/setter来监视属性
Object.defineProperty(obj, prop, {
get: function() {
if (window.variableWatchers[prop]) {
console.log('Variable accessed:', prop, '=', originalValue);
window.variableWatchers[prop].accessed(originalValue);
}
return originalValue;
},
set: function(newValue) {
if (window.variableWatchers[prop]) {
console.log('Variable changed:', prop, ':', originalValue, '->', newValue);
window.variableWatchers[prop].changed(originalValue, newValue);
}
originalValue = newValue;
},
enumerable: descriptor.enumerable,
configurable: descriptor.configurable
});
};
// 性能监控
window.performanceMonitor = {
timers: {},
startTimer: function(name) {
this.timers[name] = performance.now();
},
endTimer: function(name) {
if (this.timers[name]) {
var elapsed = performance.now() - this.timers[name];
console.log('Timer "' + name + '" took:', elapsed, 'ms');
delete this.timers[name];
return elapsed;
}
}
};
console.log('JS Debugger injected successfully');
})();
"""
self.driver.execute_script(debugger_script)
def setup_runtime_monitoring(self):
"""设置运行时监控"""
monitoring_script = """
// 监控全局异常
window.onerror = function(message, source, lineno, colno, error) {
console.log('Global error:', {
message: message,
source: source,
line: lineno,
column: colno,
error: error
});
if (window.onGlobalError) {
window.onGlobalError({
message: message,
source: source,
line: lineno,
column: colno,
error: error
});
}
return true; // 阻止默认错误处理
};
// 监控Promise异常
window.addEventListener('unhandledrejection', function(event) {
console.log('Unhandled promise rejection:', event.reason);
if (window.onUnhandledRejection) {
window.onUnhandledRejection(event.reason);
}
});
// 监控网络请求
var originalFetch = window.fetch;
window.fetch = function() {
var startTime = Date.now();
console.log('Fetch started:', arguments[0]);
return originalFetch.apply(this, arguments).then(function(response) {
console.log('Fetch completed:', {
url: response.url,
status: response.status,
duration: Date.now() - startTime + 'ms'
});
return response;
}).catch(function(error) {
console.log('Fetch failed:', {
error: error,
duration: Date.now() - startTime + 'ms'
});
throw error;
});
};
// 监控定时器
var originalSetTimeout = window.setTimeout;
window.setTimeout = function(callback, delay) {
var timerId = originalSetTimeout.call(this, function() {
console.log('setTimeout executed:', {delay: delay});
callback();
}, delay);
console.log('setTimeout scheduled:', {delay: delay, timerId: timerId});
return timerId;
};
"""
self.driver.execute_script(monitoring_script)
def capture_execution_flow(self, duration: int = 30):
"""捕获执行流程"""
print(f"开始捕获执行流程,持续 {duration} 秒...")
# 注入执行流程捕获代码
flow_capture_script = """
window.executionTrace = [];
window.functionCallStack = [];
// Hook函数调用
window.hookFunction = function(obj, methodName) {
var originalMethod = obj[methodName];
if (typeof originalMethod !== 'function') {
return;
}
obj[methodName] = function() {
var startTime = performance.now();
var stack = new Error().stack;
window.functionCallStack.push({
method: methodName,
arguments: Array.prototype.slice.call(arguments),
timestamp: Date.now(),
stack: stack,
startTime: startTime
});
try {
var result = originalMethod.apply(this, arguments);
// 记录结束时间
var endTime = performance.now();
var lastCall = window.functionCallStack[window.functionCallStack.length - 1];
lastCall.duration = endTime - startTime;
lastCall.result = result;
return result;
} catch (error) {
var endTime = performance.now();
var lastCall = window.functionCallStack[window.functionCallStack.length - 1];
lastCall.duration = endTime - startTime;
lastCall.error = error.message;
throw error;
}
};
};
// Hook所有控制台方法
['log', 'info', 'warn', 'error', 'debug'].forEach(function(method) {
var originalMethod = console[method];
console[method] = function() {
var args = Array.prototype.slice.call(arguments);
window.executionTrace.push({
type: 'console.' + method,
args: args,
timestamp: Date.now(),
stack: new Error().stack
});
originalMethod.apply(console, arguments);
};
});
"""
self.driver.execute_script(flow_capture_script)
# 等待指定时间
time.sleep(duration)
# 获取捕获的数据
trace_data = self.driver.execute_script("return {trace: window.executionTrace, calls: window.functionCallStack};")
print(f"捕获到 {len(trace_data['trace'])} 条日志记录")
print(f"捕获到 {len(trace_data['calls'])} 次函数调用")
return trace_data
def analyze_call_stack(self, call_stack: List[Dict]) -> Dict:
"""分析调用栈"""
analysis = {
'total_calls': len(call_stack),
'function_counts': {},
'slow_calls': [],
'error_calls': []
}
for call in call_stack:
method_name = call.get('method', 'unknown')
# 统计函数调用次数
if method_name in analysis['function_counts']:
analysis['function_counts'][method_name] += 1
else:
analysis['function_counts'][method_name] = 1
# 查找慢调用
duration = call.get('duration', 0)
if duration > 100: # 超过100ms的调用
analysis['slow_calls'].append(call)
# 查找错误调用
if 'error' in call:
analysis['error_calls'].append(call)
# 按调用次数排序
analysis['function_counts'] = dict(sorted(
analysis['function_counts'].items(),
key=lambda x: x[1],
reverse=True
))
return analysis
def find_algorithm_entry_points(self, trace_data: Dict) -> List[Dict]:
"""查找算法入口点"""
entry_points = []
# 查找可能的加密/签名函数调用
for call in trace_data['calls']:
method_name = call.get('method', '').lower()
if any(keyword in method_name for keyword in ['sign', 'encrypt', 'hash', 'digest', 'token']):
entry_points.append({
'function': call['method'],
'arguments': call['arguments'],
'timestamp': call['timestamp'],
'duration': call.get('duration', 0),
'call_stack': call.get('stack', '').split('\\n')[:5] # 只取前5行
})
return entry_points
class AdvancedJSDetector:
"""高级JS检测器"""
def __init__(self, driver):
self.driver = driver
self.dynamic_analyzer = JSDynamicDebugger(driver)
def detect_obfuscation_techniques(self) -> Dict:
"""检测混淆技术"""
detection_script = """
var detection = {
techniques: [],
severity: 0,
details: {}
};
// 检测字符串拼接混淆
if (document.documentElement.innerHTML.includes('+String.fromCharCode(') ||
document.documentElement.innerHTML.includes("['charAt']")) {
detection.techniques.push('String Obfuscation');
detection.severity += 1;
}
// 检测控制流混淆
var controlFlowPatterns = [
/(?:while|for|if)\\s*\\(\\s*!\\s*\\w+\\s*&&/g,
/(?:while|for|if)\\s*\\(\\s*\\w+\\s*===\\s*false/g,
/(?:while|for|if)\\s*\\(\\s*!\\s*\\(\\s*\\w+\\s*\\))/g
];
var scriptTags = document.getElementsByTagName('script');
for (var i = 0; i < scriptTags.length; i++) {
var scriptContent = scriptTags[i].textContent || '';
for (var j = 0; j < controlFlowPatterns.length; j++) {
if (controlFlowPatterns[j].test(scriptContent)) {
detection.techniques.push('Control Flow Obfuscation');
detection.severity += 2;
break;
}
}
}
// 检测死代码
var deadCodePatterns = [
/if\\s*\\(false\\)/g,
/while\\s*\\(false\\)/g,
/var\\s+\\w+\\s*=\\s*undefined;/g
];
for (var i = 0; i < scriptTags.length; i++) {
var scriptContent = scriptTags[i].textContent || '';
for (var j = 0; j < deadCodePatterns.length; j++) {
if (deadCodePatterns[j].test(scriptContent)) {
detection.techniques.push('Dead Code Insertion');
detection.severity += 1;
break;
}
}
}
// 检测花指令
var junkCodePatterns = [
/var\\s+\\w+\\s*=\\s*\\w+\\s*\\^\\s*\\w+;/g,
/var\\s+\\w+\\s*=\\s*\\w+\\s*&\\s*\\w+;/g,
/var\\s+\\w+\\s*=\\s*\\w+\\s*\\|\\s*\\w+;/g
];
for (var i = 0; i < scriptTags.length; i++) {
var scriptContent = scriptTags[i].textContent || '';
for (var j = 0; j < junkCodePatterns.length; j++) {
if (junkCodePatterns[j].test(scriptContent)) {
detection.techniques.push('Junk Code Insertion');
detection.severity += 1;
break;
}
}
}
return detection;
"""
return self.driver.execute_script(detection_script)
def detect_anti_debugging(self) -> Dict:
"""检测反调试技术"""
anti_debug_detection = """
var detection = {
antiDebugMethods: [],
severity: 0,
hasAntiDebug: false
};
// 检测开发者工具检测
var devtoolsDetected = false;
// 方法1: 检测控制台维度
if (window.outerHeight - window.innerHeight > 200 || window.outerWidth - window.innerWidth > 200) {
detection.antiDebugMethods.push('Window Dimension Check');
detection.severity += 1;
devtoolsDetected = true;
}
// 方法2: 检测异常处理
var start = new Date().getTime();
debugger;
var end = new Date().getTime();
if (end - start > 100) {
detection.antiDebugMethods.push('Execution Pause Detection');
detection.severity += 1;
devtoolsDetected = true;
}
// 方法3: 检测属性重写
var hasPropertyOverride = false;
try {
Object.defineProperty(navigator, 'webdriver', {
get: function() {
detection.antiDebugMethods.push('Navigator Property Override');
detection.severity += 1;
return undefined;
}
});
} catch(e) {
detection.antiDebugMethods.push('Property Definition Prevention');
detection.severity += 1;
hasPropertyOverride = true;
}
// 方法4: 检测定时器干扰
var originalSetInterval = window.setInterval;
var intervalCount = 0;
window.setInterval = function(callback, delay) {
intervalCount++;
if (intervalCount > 1000) { // 异常高频调用
detection.antiDebugMethods.push('Timer Interference');
detection.severity += 2;
}
return originalSetInterval.call(this, callback, delay);
};
// 方法5: 检测错误处理
var originalOnError = window.onerror;
window.onerror = function(message, source, lineno, colno, error) {
if (message.includes('Script error')) {
detection.antiDebugMethods.push('Error Handler Manipulation');
detection.severity += 1;
}
if (originalOnError) {
return originalOnError(message, source, lineno, colno, error);
}
};
detection.hasAntiDebug = detection.antiDebugMethods.length > 0;
return detection;
"""
return self.driver.execute_script(anti_debug_detection)
def analyze_runtime_behavior(self, duration: int = 10) -> Dict:
"""分析运行时行为"""
print(f"开始分析运行时行为,持续 {duration} 秒...")
# 设置监控
self.dynamic_analyzer.setup_runtime_monitoring()
self.dynamic_analyzer.inject_debugger_script()
# 捕获执行流程
trace_data = self.dynamic_analyzer.capture_execution_flow(duration)
# 分析调用栈
call_analysis = self.dynamic_analyzer.analyze_call_stack(trace_data['calls'])
# 查找算法入口点
entry_points = self.dynamic_analyzer.find_algorithm_entry_points(trace_data)
# 检测混淆和反调试
obfuscation_detection = self.detect_obfuscation_techniques()
anti_debug_detection = self.detect_anti_debugging()
analysis_result = {
'execution_trace': {
'total_logs': len(trace_data['trace']),
'total_calls': len(trace_data['calls']),
'call_analysis': call_analysis,
'entry_points': entry_points
},
'obfuscation_detection': obfuscation_detection,
'anti_debug_detection': anti_debug_detection,
'recommendations': self.generate_recommendations(
obfuscation_detection,
anti_debug_detection,
call_analysis
)
}
return analysis_result
def generate_recommendations(self, obfuscation_detect, anti_debug_detect, call_analysis) -> List[str]:
"""生成分析建议"""
recommendations = []
if obfuscation_detect['severity'] > 0:
recommendations.append(f"检测到代码混淆 ({obfuscation_detect['techniques']}),建议使用反混淆工具")
if anti_debug_detect['hasAntiDebug']:
recommendations.append(f"检测到反调试技术 ({anti_debug_detect['antiDebugMethods']}),需要绕过")
if call_analysis['slow_calls']:
slow_count = len(call_analysis['slow_calls'])
recommendations.append(f"发现 {slow_count} 个慢速调用,可能存在性能问题")
if call_analysis['error_calls']:
error_count = len(call_analysis['error_calls'])
recommendations.append(f"发现 {error_count} 个错误调用,需要关注")
return recommendations
def dynamic_debugging_example():
"""动态调试示例"""
# 假设已经设置好driver
# driver = setup_hybrid_debugging()
# detector = AdvancedJSDetector(driver)
# # 分析运行时行为
# analysis = detector.analyze_runtime_behavior(duration=15)
# print("=== 运行时行为分析结果 ===")
# print(f"总日志数: {analysis['execution_trace']['total_logs']}")
# print(f"总调用数: {analysis['execution_trace']['total_calls']}")
# print("\\n=== 混淆检测结果 ===")
# print(f"混淆技术: {analysis['obfuscation_detection']['techniques']}")
# print(f"严重程度: {analysis['obfuscation_detection']['severity']}")
# print("\\n=== 反调试检测结果 ===")
# print(f"反调试方法: {analysis['anti_debug_detection']['antiDebugMethods']}")
# print(f"存在反调试: {analysis['anti_debug_detection']['hasAntiDebug']}")
# print("\\n=== 算法入口点 ===")
# for point in analysis['execution_trace']['entry_points']:
# print(f"函数: {point['function']}")
# print(f"参数: {point['arguments']}")
# print(f"耗时: {point['duration']}ms")
# print("---")
print("动态调试示例代码已准备")
if __name__ == "__main__":
dynamic_debugging_example()#3. RPC调用技术
#3.1 Frida RPC调用
import frida
import time
import json
import base64
from typing import Any, Dict, List, Optional
class FridaRPCHelper:
"""Frida RPC助手"""
def __init__(self, device_id: str = None):
self.device = None
self.session = None
self.script = None
self.rpc_exports = {}
self.connect_device(device_id)
def connect_device(self, device_id: str = None):
"""连接设备"""
try:
if device_id:
self.device = frida.get_usb_device(2)
else:
self.device = frida.get_usb_device(2)
print(f"✓ 连接到设备: {self.device.name}")
except:
try:
self.device = frida.get_remote_device()
print(f"✓ 连接到远程设备: {self.device.name}")
except:
print("✗ 无法连接到设备")
raise
def attach_to_app(self, package_name: str):
"""附加到应用"""
try:
# 等待应用启动
self.session = self.device.attach(package_name)
print(f"✓ 附加到应用: {package_name}")
return True
except Exception as e:
print(f"✗ 附加失败: {e}")
return False
def setup_rpc_interface(self, js_exports: str = None):
"""设置RPC接口"""
if js_exports is None:
js_exports = self._default_exports()
js_code = f"""
// RPC接口设置
rpc.exports = {{
{js_exports}
}};
console.log("[RPC] 接口已导出");
"""
try:
self.script = self.session.create_script(js_code)
self.script.load()
print("✓ RPC接口设置成功")
# 获取导出的函数
self.rpc_exports = self.script.exports
return True
except Exception as e:
print(f"✗ RPC接口设置失败: {e}")
return False
def _default_exports(self) -> str:
"""默认导出函数"""
return """
// 示例加密函数导出
encryptdata: function(data, key) {
Java.perform(function() {
try {
var cipher = Java.use('javax.crypto.Cipher');
var secretKeySpec = Java.use('javax.crypto.spec.SecretKeySpec');
var cipherInstance = cipher.getInstance('AES/CBC/PKCS5Padding');
var secretKey = secretKeySpec.$new(key, 'AES');
cipherInstance.init(1, secretKey); // 1 = ENCRYPT_MODE
var encrypted = cipherInstance.doFinal(data);
// 转换为Base64
var base64Encoder = Java.use('android.util.Base64');
var encoded = base64Encoder.encodeToString(encrypted, 2); // NO_WRAP
return encoded;
} catch (e) {
console.log('Encryption error: ' + e);
return 'ERROR: ' + e;
}
});
},
// 示例哈希函数导出
calchash: function(data, algorithm) {
Java.perform(function() {
try {
var messageDigest = Java.use('java.security.MessageDigest');
var md = messageDigest.getInstance(algorithm);
var dataBytes = Java.use('java.lang.String').$new(data).getBytes();
var hashBytes = md.digest(dataBytes);
var bigInteger = Java.use('java.math.BigInteger');
var hashed = bigInteger.$new(1, hashBytes).toString(16);
return hashed;
} catch (e) {
console.log('Hash error: ' + e);
return 'ERROR: ' + e;
}
});
},
// 示例签名函数导出
calculatesign: function(params, secret) {
Java.perform(function() {
try {
// 这里实现具体的签名逻辑
var paramMap = JSON.parse(params);
var sortedKeys = Object.keys(paramMap).sort();
var signStr = '';
for (var i = 0; i < sortedKeys.length; i++) {
var key = sortedKeys[i];
if (key !== 'sign') {
signStr += key + '=' + paramMap[key] + '&';
}
}
signStr = signStr.slice(0, -1); // 移除最后一个&
var fullStr = signStr + secret;
// 使用MD5哈希
var messageDigest = Java.use('java.security.MessageDigest');
var md = messageDigest.getInstance('MD5');
var dataBytes = Java.use('java.lang.String').$new(fullStr).getBytes();
var hashBytes = md.digest(dataBytes);
var bigInteger = Java.use('java.math.BigInteger');
var hashed = bigInteger.$new(1, hashBytes).toString(16);
return hashed.toUpperCase();
} catch (e) {
console.log('Sign error: ' + e);
return 'ERROR: ' + e;
}
});
},
// 获取设备信息
getdeviceinfo: function() {
Java.perform(function() {
try {
var build = Java.use('android.os.Build');
var version = Java.use('android.os.Build$VERSION');
return {
brand: build.BRAND.value,
model: build.MODEL.value,
version: version.RELEASE.value,
sdk: version.SDK_INT.value,
fingerprint: build.FINGERPRINT.value
};
} catch (e) {
console.log('Device info error: ' + e);
return 'ERROR: ' + e;
}
});
},
// 执行shell命令
execshell: function(command) {
Java.perform(function() {
try {
var runtime = Java.use('java.lang.Runtime');
var process = runtime.getRuntime().exec(command);
var bufferedReader = Java.use('java.io.BufferedReader');
var inputStreamReader = Java.use('java.io.InputStreamReader');
var reader = bufferedReader.$new(inputStreamReader.$new(process.getInputStream()));
var line = reader.readLine();
var output = '';
while (line !== null) {
output += line + '\\n';
line = reader.readLine();
}
process.waitFor();
return output;
} catch (e) {
console.log('Shell command error: ' + e);
return 'ERROR: ' + e;
}
});
}
"""
def call_rpc_function(self, function_name: str, *args) -> Any:
"""调用RPC函数"""
if not self.rpc_exports:
print("✗ RPC接口未设置")
return None
try:
func = getattr(self.rpc_exports, function_name)
result = func(*args)
return result
except Exception as e:
print(f"✗ RPC函数调用失败: {e}")
return None
def encrypt_data(self, data: str, key: str) -> str:
"""加密数据"""
return self.call_rpc_function('encryptdata', data, key)
def calc_hash(self, data: str, algorithm: str = 'MD5') -> str:
"""计算哈希"""
return self.call_rpc_function('calchash', data, algorithm)
def calc_sign(self, params: Dict, secret: str) -> str:
"""计算签名"""
params_json = json.dumps(params)
return self.call_rpc_function('calculatesign', params_json, secret)
def get_device_info(self) -> Dict:
"""获取设备信息"""
return self.call_rpc_function('getdeviceinfo')
def exec_shell(self, command: str) -> str:
"""执行shell命令"""
return self.call_rpc_function('execshell', command)
def list_available_functions(self) -> List[str]:
"""列出可用的RPC函数"""
if not self.rpc_exports:
return []
try:
# 获取导出函数列表
return [attr for attr in dir(self.rpc_exports) if not attr.startswith('_')]
except:
return []
class AdvancedFridaRPC(FridaRPCHelper):
"""高级Frida RPC"""
def __init__(self, device_id: str = None):
super().__init__(device_id)
self.custom_exports = {}
def add_custom_export(self, name: str, js_code: str):
"""添加自定义导出函数"""
self.custom_exports[name] = js_code
def setup_advanced_exports(self, app_specific_hooks: List[Dict] = None):
"""设置高级导出(针对特定应用)"""
exports_parts = []
# 添加默认导出
exports_parts.append(self._default_exports())
# 添加应用特定的钩子
if app_specific_hooks:
for hook in app_specific_hooks:
exports_parts.append(f"""
{hook['name']}: function() {{
Java.perform(function() {{
{hook['code']}
}});
}},
""")
# 添加自定义导出
for name, code in self.custom_exports.items():
exports_parts.append(f"""
{name}: function() {{
Java.perform(function() {{
{code}
}});
}},
""")
full_exports = ','.join(exports_parts)
js_code = f"""
rpc.exports = {{
{full_exports}
}};
console.log("[Advanced RPC] 高级接口已导出");
"""
try:
if self.script:
self.script.unload()
self.script = self.session.create_script(js_code)
self.script.load()
print("✓ 高级RPC接口设置成功")
self.rpc_exports = self.script.exports
return True
except Exception as e:
print(f"✗ 高级RPC接口设置失败: {e}")
return False
def hook_and_export(self, class_name: str, method_name: str, hook_code: str):
"""Hook方法并导出为RPC函数"""
rpc_name = f"hooked_{method_name.lower().replace('$', '_').replace('<', '').replace('>', '')}"
full_hook_code = f"""
var targetClass = Java.use('{class_name}');
var targetMethod = targetClass['{method_name}'];
var originalMethod = targetMethod;
targetClass['{method_name}'].implementation = function() {{
console.log('[Hook] {method_name} called');
// 执行自定义代码
{hook_code}
// 调用原始方法
var result = originalMethod.apply(this, arguments);
console.log('[Hook] {method_name} finished');
return result;
}};
// 返回结果
return 'Hooked {method_name} successfully';
"""
self.add_custom_export(rpc_name, full_hook_code)
return rpc_name
def memory_dump(self, address: int, size: int = 256) -> str:
"""内存转储"""
js_code = f"""
Memory.dump(ptr('{hex(address)}'), {{size: {size}}})
"""
# 这里需要通过RPC调用来实现
pass
def trace_method_calls(self, class_name: str, method_name: str) -> List[Dict]:
"""跟踪方法调用"""
trace_data = []
# 设置跟踪钩子
hook_code = f"""
var traceData = [];
var targetClass = Java.use('{class_name}');
var targetMethod = targetClass['{method_name}'];
targetMethod.implementation = function() {{
var callInfo = {{
method: '{class_name}.{method_name}',
arguments: [],
returnValue: null,
timestamp: Date.now()
}};
// 记录参数
for (var i = 0; i < arguments.length; i++) {{
try {{
callInfo.arguments.push(String(arguments[i]));
}} catch(e) {{
callInfo.arguments.push('[Error converting argument]');
}}
}}
// 调用原始方法
var result = this['{method_name}'].apply(this, arguments);
callInfo.returnValue = String(result);
traceData.push(callInfo);
console.log('[Trace] ' + JSON.stringify(callInfo));
return result;
}};
return traceData;
"""
self.add_custom_export('trace_calls', hook_code)
return self.call_rpc_function('trace_calls')
# 金融App特定的RPC工具
class FinanceAppRPC(AdvancedFridaRPC):
"""金融App专用RPC工具"""
def __init__(self, package_name: str, device_id: str = None):
super().__
