抖音APP抓包分析实践

通过实际案例来分析抖音APP的网络请求结构和数据抓取方法。

抖音API接口分析

import requests
import json
import time
import hashlib
import random
from urllib.parse import urlencode
import re
from typing import Dict, Any, Optional

class DouyinAPIScraper:
    """抖音API接口分析和数据抓取工具"""
    
    def __init__(self):
        self.session = requests.Session()
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Linux; Android 10; SM-G960U) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.181 Mobile Safari/537.36',
            'Accept': '*/*',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'sec-fetch-dest': 'empty',
            'sec-fetch-mode': 'cors',
            'sec-fetch-site': 'same-origin',
        }
        self.api_base = 'https://aweme.snssdk.com'
        self.device_info = self._generate_device_info()
        
    def _generate_device_info(self) -> Dict[str, str]:
        """生成模拟的设备信息"""
        return {
            'device_platform': 'android',
            'device_type': 'SM-G960U',
            'device_brand': 'samsung',
            'os_api': '29',
            'os_version': '10',
            'iid': str(random.randint(1000000000000000, 9999999999999999)),
            'idfa': '',  # iOS广告标识符
            'aid': '1128',  # 抖音应用ID
        }
    
    def _sign_params(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """模拟抖音参数签名(简化版)"""
        # 实际的抖音签名算法非常复杂,这里只是模拟
        timestamp = str(int(time.time()))
        params['ts'] = timestamp
        params['version_code'] = '230701'
        params['app_name'] = 'aweme'
        params['_rticket'] = str(int(time.time() * 1000))
        
        # 添加设备信息
        params.update(self.device_info)
        
        return params
    
    def get_user_profile(self, user_id: str) -> Dict[str, Any]:
        """获取用户信息"""
        url = f"{self.api_base}/aweme/v1/user/"
        params = {
            'user_id': user_id,
            'count': 20,
            'max_cursor': 0,
        }
        
        signed_params = self._sign_params(params)
        headers = self.headers.copy()
        headers['Host'] = 'aweme.snssdk.com'
        
        try:
            response = self.session.get(url, params=signed_params, headers=headers)
            if response.status_code == 200:
                return response.json()
        except Exception as e:
            print(f"获取用户信息失败: {e}")
        
        return {}
    
    def get_user_posts(self, sec_uid: str, max_cursor: int = 0) -> Dict[str, Any]:
        """获取用户发布的视频"""
        url = f"{self.api_base}/aweme/v1/aweme/post/"
        params = {
            'sec_uid': sec_uid,
            'count': 20,
            'max_cursor': max_cursor,
        }
        
        signed_params = self._sign_params(params)
        headers = self.headers.copy()
        headers['Host'] = 'aweme.snssdk.com'
        
        try:
            response = self.session.get(url, params=signed_params, headers=headers)
            if response.status_code == 200:
                return response.json()
        except Exception as e:
            print(f"获取用户作品失败: {e}")
        
        return {}
    
    def get_video_detail(self, aweme_id: str) -> Dict[str, Any]:
        """获取视频详情"""
        url = f"{self.api_base}/aweme/v1/aweme/detail/"
        params = {
            'aweme_id': aweme_id,
        }
        
        signed_params = self._sign_params(params)
        headers = self.headers.copy()
        headers['Host'] = 'aweme.snssdk.com'
        
        try:
            response = self.session.get(url, params=signed_params, headers=headers)
            if response.status_code == 200:
                return response.json()
        except Exception as e:
            print(f"获取视频详情失败: {e}")
        
        return {}

class DouyinTrafficAnalyzer:
    """抖音流量分析器 - 用于分析抓包数据"""
    
    def __init__(self):
        self.api_endpoints = {
            'user_info': r'/aweme/v1/user/',
            'user_post': r'/aweme/v1/aweme/post/',
            'video_detail': r'/aweme/v1/aweme/detail/',
            'feed': r'/aweme/v1/feed/',
            'comment_list': r'/aweme/v1/comment/list/',
            'like_action': r'/aweme/v1/commit/item/digg/',
        }
        self.analyzed_data = {
            'requests': [],
            'responses': [],
            'patterns': {},
            'security_features': [],
        }
    
    def analyze_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
        """分析单个请求"""
        analysis = {
            'url': request_data.get('url', ''),
            'method': request_data.get('method', ''),
            'headers': request_data.get('headers', {}),
            'params': request_data.get('params', {}),
            'timestamp': request_data.get('timestamp', ''),
            'endpoint_type': self._identify_endpoint(request_data.get('url', '')),
            'security_headers': self._check_security_headers(request_data.get('headers', {})),
        }
        
        return analysis
    
    def _identify_endpoint(self, url: str) -> str:
        """识别API端点类型"""
        for endpoint_name, pattern in self.api_endpoints.items():
            if re.search(pattern, url):
                return endpoint_name
        return 'unknown'
    
    def _check_security_headers(self, headers: Dict[str, str]) -> list:
        """检查安全相关头部"""
        security_headers = []
        for header, value in headers.items():
            if any(keyword in header.lower() for keyword in ['token', 'signature', 'verify', 'x-']):
                security_headers.append((header, value))
        return security_headers
    
    def extract_video_data(self, response_data: Dict[str, Any]) -> list:
        """从响应中提取视频数据"""
        videos = []
        
        if 'aweme_list' in response_data:
            # 用户作品列表或推荐列表
            for aweme in response_data['aweme_list']:
                video_info = self._parse_aweme_data(aweme)
                if video_info:
                    videos.append(video_info)
        elif 'aweme_detail' in response_data:
            # 单个视频详情
            video_info = self._parse_aweme_data(response_data['aweme_detail'])
            if video_info:
                videos.append(video_info)
        
        return videos
    
    def _parse_aweme_data(self, aweme: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """解析单个aweme数据"""
        try:
            video_info = {
                'aweme_id': aweme.get('aweme_id'),
                'desc': aweme.get('desc', ''),  # 视频描述
                'create_time': aweme.get('create_time'),
                'author': {
                    'uid': aweme.get('author', {}).get('uid'),
                    'nickname': aweme.get('author', {}).get('nickname'),
                    'avatar': aweme.get('author', {}).get('avatar_thumb', {}).get('url_list', [None])[0],
                },
                'video': {
                    'play_addr': aweme.get('video', {}).get('play_addr', {}).get('url_list', [None])[0],
                    'cover': aweme.get('video', {}).get('cover', {}).get('url_list', [None])[0],
                    'height': aweme.get('video', {}).get('height'),
                    'width': aweme.get('video', {}).get('width'),
                    'duration': aweme.get('video', {}).get('duration'),
                },
                'stats': {
                    'like_count': aweme.get('statistics', {}).get('digg_count'),
                    'comment_count': aweme.get('statistics', {}).get('comment_count'),
                    'share_count': aweme.get('statistics', {}).get('share_count'),
                    'play_count': aweme.get('statistics', {}).get('play_count'),
                },
                'music': {
                    'id': aweme.get('music', {}).get('id'),
                    'title': aweme.get('music', {}).get('title'),
                    'author': aweme.get('music', {}).get('author'),
                    'play_url': aweme.get('music', {}).get('play_url', {}).get('url_list', [None])[0],
                }
            }
            return video_info
        except Exception as e:
            print(f"解析aweme数据失败: {e}")
            return None
    
    def detect_signatures(self, request_data: Dict[str, Any]) -> Dict[str, str]:
        """检测可能的签名参数"""
        signatures = {}
        params = request_data.get('params', {})
        
        # 常见的签名参数名
        signature_keywords = ['_signature', 'verify', 'token', 'sign', 'ts', 'vcd']
        
        for key, value in params.items():
            if any(keyword in key.lower() for keyword in signature_keywords):
                signatures[key] = value
        
        return signatures

def analyze_douyin_traffic():
    """分析抖音流量的完整流程"""
    print("🔍 开始分析抖音APP流量...")
    
    analyzer = DouyinTrafficAnalyzer()
    
    # 模拟分析几个典型的请求
    sample_requests = [
        {
            'url': 'https://aweme.snssdk.com/aweme/v1/feed/',
            'method': 'GET',
            'params': {
                'count': '6',
                'type': '0',
                'max_cursor': '0',
                'min_cursor': '0',
                '_signature': 'xxx',
                'ts': '1234567890'
            },
            'headers': {
                'user-agent': 'Aweme/23.7.0 (iPhone; iOS 14.4; Scale/2.00)',
                'x-tt-token': 'xxx',
                'x-khronos': '1234567890'
            },
            'timestamp': '2024-01-01T12:00:00Z'
        },
        {
            'url': 'https://aweme.snssdk.com/aweme/v1/aweme/post/',
            'method': 'GET',
            'params': {
                'sec_uid': 'MS4wLjABAAAA...',
                'count': '20',
                'max_cursor': '0',
                'verify': 'xxx'
            },
            'headers': {
                'user-agent': 'Aweme/23.7.0 (Android; Android 10; Scale/3.00)',
                'x-tt-token': 'xxx'
            },
            'timestamp': '2024-01-01T12:01:00Z'
        }
    ]
    
    print("\n📋 分析抓包数据:")
    for i, req in enumerate(sample_requests, 1):
        print(f"\n请求 {i}:")
        analysis = analyzer.analyze_request(req)
        print(f"  端点类型: {analysis['endpoint_type']}")
        print(f"  安全头部: {analysis['security_headers']}")
        
        signatures = analyzer.detect_signatures({'params': req['params']})
        if signatures:
            print(f"  签名参数: {signatures}")
    
    print("\n🎯 抖音APP抓包分析要点:")
    print("1. 注意User-Agent伪装,需要模拟真实设备")
    print("2. 关注_signature参数,这是主要的反爬措施")
    print("3. x-tt-token、x-khronos等头部是重要的安全验证")
    print("4. ts参数通常是时间戳,需要实时生成")
    print("5. 频繁请求会被限流,需要控制频率")
    
    return analyzer

# 实际抓包分析指导
def douyin_proxy_setup():
    """抖音抓包代理设置指导"""
    guide = """
    === 抖音APP抓包设置指南 ===
    
    1. 代理工具配置:
       - Fiddler/Charles/Mitmproxy任选其一
       - 确保证书已正确安装到设备
       - 启用HTTPS解密功能
    
    2. 设备网络配置:
       - WiFi设置 -> 代理 -> 手动
       - 服务器: 电脑IP地址
       - 端口: 8888 (Fiddler) 或 8080 (Charles/Mitmproxy)
    
    3. 抖音APP特殊处理:
       - 部分版本使用SSL Pinning
       - 需要用Frida绕过证书验证
       - 可能需要Root权限安装证书到系统目录
    
    4. 数据分析重点:
       - 关注aweme.snssdk.com域名下的请求
       - 分析不同API端点的数据结构
       - 识别参数加密和签名算法
    """
    print(guide)
    return guide

if __name__ == "__main__":
    analyzer = analyze_douyin_traffic()