电商App爬虫综合实战项目指南

重要前置说明:本项目仅供技术学习交流,请严格遵守Robots协议、电商平台规则及相关法律法规,合理控制请求频率,绝不非法获取个人敏感信息或进行商业获利。

在爬取电商App数据时,我们常常会面临几大现实难题:SSL Pinning 导致抓包工具一片空白请求签名逆向成本高且接口说变就变多设备调度混乱IP 被封申诉无门同一商品反复爬取浪费资源……这些痛点有没有一种轻量又皮实的方式一次性解决?

本教程将带你从零搭建一套轻量全链路的电商App爬虫系统。它从绕过SSL Pinning开始,用自动化操作配合抓包辅助补全数据,再结合Redis做任务调度与去重、本地代理池防封、多设备协同工作,一气呵成。即使你只有一台闲置手机,也能快速跑通整个流程;如果能再加几台设备,就能自然扩展成分布式小集群。


核心技术架构概览

整个系统采用轻量分层异步架构,模块之间松耦合,单设备可以独立运行,也支持多设备横向扩展:

flowchart LR
    A[控制终端] -->|提交任务| B[Redis调度中心]
    B -->|优先级下发| C1[爬虫工作节点1]
    B -->|优先级下发| C2[爬虫工作节点2]
    C1 --> D[代理池系统]
    C2 --> D
    C1 -->|解析数据| E[数据处理管道]
    C2 --> E
    E -->|持久化| F[MySQL存储]
  • 控制终端:开发者下达抓取任务(例如抓取哪个分类、多少页)。
  • Redis调度中心:负责优先级任务队列、URL去重、设备状态管理。
  • 爬虫工作节点:跑在安卓设备(或模拟器)上的 Python 脚本,通过 uiautomator2 控制应用,同时可选搭配抓包获取更完整的数据。
  • 代理池系统:为爬虫提供动态更换的代理 IP,降低单 IP 被封风险。
  • 数据处理管道:清洗、去重、格式化爬回来的原始数据。
  • MySQL存储:持久化最终的结构化数据。

下面我们按照从底向上的顺序,逐一实现各个核心模块。


核心模块逐模块实现

1. 反反爬虫基础:SSL Pinning 一键绕过

抓包是分析App数据的第一步,但大多数电商App都会使用 OkHttp3/OkHttp4CertificatePinner 来验证服务器证书,导致 Charles 或 Fiddler 中只能看到一堆乱码或者连接失败。好在这种防护方式相对固定,我们可以用 Frida 动态注入一段脚本,在App启动前就把它“掏空”。

下面是一个封装好的 Python 类,可以方便地在群控场景下指定设备,自动附加到目标App并加载通用绕过脚本。

import frida
import sys

class SSLPinningBypass:
    """一键通用OkHttp3/OkHttp4 SSL Pinning绕过类"""
    
    def __init__(self, device_serial=None):
        """可选指定设备(群控场景常用)"""
        try:
            self.device = frida.get_device(device_serial) if device_serial else frida.get_usb_device(timeout=3)
        except Exception as e:
            print(f"✗ 连接设备失败: {e}")
            sys.exit(1)
        self.session = None
        self.script = None
    
    def attach_and_bypass(self, pkg_name: str):
        """附加App + 加载绕过脚本"""
        try:
            # 优先附加已启动App,否则自动重启绕过
            self.session = self.device.attach(pkg_name)
            print(f"✓ 已附加到 {pkg_name}")
        except frida.ProcessNotFoundError:
            print(f"✓ {pkg_name} 未启动,正在启动并提前注入绕过脚本...")
            pid = self.device.spawn([pkg_name])
            self.session = self.device.attach(pid)
            self.device.resume(pid)
        
        # 加载极简通用绕过脚本(适配80%+场景)
        self._load_bypass_script()
    
    def _load_bypass_script(self):
        """Frida Hook脚本:覆盖OkHttp3/4的check+CertificatePinner构造"""
        bypass_js = """
        console.log("[√] Frida SSL Pinning通用Hook启动...");
        
        // 1. OkHttp3/4 CertificatePinner.check 所有重载直接空实现
        try {
            var CertPinner = Java.use("okhttp3.CertificatePinner");
            CertPinner.check.overload('java.lang.String', 'java.util.List').implementation = function() { return; };
            CertPinner.check.overload('java.lang.String', 'java.security.cert.Certificate').implementation = function() { return; };
            console.log("[√] OkHttp3/4 CertificatePinner.check 已全部禁用");
        } catch (e) {
            console.log("[×] 未找到OkHttp3/4 CertificatePinner");
        }
        
        // 2. 兜底禁用TrustManager
        try {
            var X509TrustManager = Java.use("javax.net.ssl.X509TrustManager");
            var EmptyTrustManager = Java.registerClass({
                name: "com.frida.EmptyTrustManager",
                implements: [X509TrustManager],
                methods: {
                    checkClientTrusted: function() {},
                    checkServerTrusted: function() {},
                    getAcceptedIssuers: function() { return []; }
                }
            });
            console.log("[√] 已注册空TrustManager");
        } catch (e) {
            console.log("[×] 兜底TrustManager注册失败: " + e);
        }
        """
        
        self.script = self.session.create_script(bypass_js)
        self.script.load()
        print("[√] 绕过脚本加载完成,请使用Charles/Fiddler抓包!")

使用时只需要传入包名即可:

bypass = SSLPinningBypass()
bypass.attach_and_bypass("com.example.shopping")

Frida 会注入到 App 进程,之后你在抓包工具中就能看到明文请求了。这套脚本对大部分基于 OkHttp 的 SSL Pinning 实现具有通用性。


2. 轻量稳定的爬取层:uiautomator2 自动化

如果你没有足够精力去逆向加密的请求签名,或者目标 App 频繁更新导致签名算法三天两头变,UI 自动化 + 抓包辅助是一个既稳定又省心的折衷方案。

这里我们使用 uiautomator2 库来控制设备,它可以模拟点击、滑动、读取界面元素,基本能满足商品列表、详情页的数据提取需求。下面的 ProductCrawler 类实现了一个基础的商品爬取器:

  • 随机延迟:模拟人类操作的停顿,避免被反自动化机制识别。
  • 元素定位:通过常见的 resource-id 模式匹配来获取商品标题和价格。
  • 滚动翻页:用 swipe 模拟下滑,每次提取前进行去重,防止重复抓取。
import uiautomator2 as u2
import time
import random

class ProductCrawler:
    """单设备商品列表+详情基础爬取器"""
    
    def __init__(self, device_serial=None, wait_base=2, wait_range=3):
        """设置随机延迟,模拟人类操作"""
        self.d = u2.connect(device_serial) if device_serial else u2.connect()
        self.wait_base = wait_base
        self.wait_range = wait_range
    
    def _human_wait(self):
        time.sleep(random.uniform(self.wait_base, self.wait_base + self.wait_range))
    
    def _extract_list_page(self) -> list[dict]:
        """提取当前可见商品列表,适配常见的resourceId格式"""
        products = []
        # 尝试匹配常见的商品容器+标题价格id
        try:
            containers = self.d(resourceIdMatches=r".*product.*item|.*item.*product")
            if not containers.exists:
                containers = self.d(scrollable=True).child(className="android.view.ViewGroup")[:10]
            for c in containers[:10]:  # 避免重复滚动提取同一元素
                title = c.child(resourceIdMatches=r".*title|.*name").get_text() if c.child(resourceIdMatches=r".*title|.*name").exists else ""
                price = c.child(resourceIdMatches=r".*price").get_text() if c.child(resourceIdMatches=r".*price").exists else ""
                if title and price:
                    products.append({"title": title.strip(), "price": price.strip(), "extracted_at": time.strftime("%Y-%m-%d %H:%M:%S")})
        except Exception as e:
            print(f"[×] 提取列表页失败: {e}")
        return products
    
    def crawl_category(self, category_name: str, max_scrolls=5) -> list[dict]:
        """根据分类名称进入并爬取(需要提前配置入口坐标或文本)"""
        # 这里简化为:通过文本定位分类入口并点击
        try:
            self.d(text=category_name).click(timeout=10)
            self._human_wait()
        except Exception as e:
            print(f"[×] 找不到分类 {category_name}: {e}")
            return []
        
        all_products = []
        seen_titles = set()
        for _ in range(max_scrolls):
            # 提取当前页+去重
            current = self._extract_list_page()
            for p in current:
                if p["title"] not in seen_titles:
                    seen_titles.add(p["title"])
                    all_products.append(p)
            # 模拟人类滑动
            self.d.swipe(500, 1800, 500, 500, duration=0.3)
            self._human_wait()
        print(f"[√] 分类 {category_name} 共爬取 {len(all_products)} 个不重复商品")
        return all_products

注意:UI 自动化受设备性能、网络波动影响较大,建议搭配重试机制和异常恢复逻辑。另外,如果App页面结构发生较大变化,你可能需要手动调整 resourceIdMatches 的正则表达式。


3. 任务调度 + IP 管理核心模块

当爬取任务变多,或者需要多台设备并行工作时,就需要一套调度中心来分配任务、去重,并且为爬虫提供稳定的代理 IP,防止被封禁。

3.1 轻量 Redis 优先级任务队列

Redis 天生适合做队列,我们利用它的 List 实现优先级队列,Set 完成去重标记,Hash 存储任务详情。这里定义了一个 RedisTaskQueue 类,支持:

  • 优先级:分为高、普通、低三个等级,高优先级任务会被优先消费。
  • 去重:可传入可选的 unique_key(例如商品 ID),如果发现该 key 已经在 seen_set 中,则会跳过该任务。
  • 任务详情过期:任务数据设置 7 天有效期,避免占用过多内存。
import redis
import json
import time
from enum import IntEnum

class TaskPriority(IntEnum):
    LOW = 1
    NORMAL = 2
    HIGH = 3

class RedisTaskQueue:
    """Redis List+Set实现的优先级+去重任务队列"""
    
    def __init__(self, r_host="localhost", r_port=6379, r_db=0):
        self.r = redis.StrictRedis(host=r_host, port=r_port, db=r_db, decode_responses=True)
        self.prefix = "app_crawler:"
        self.task_key = f"{self.prefix}tasks:"
        self.priority_queue = {
            TaskPriority.HIGH: f"{self.prefix}queue:high",
            TaskPriority.NORMAL: f"{self.prefix}queue:normal",
            TaskPriority.LOW: f"{self.prefix}queue:low"
        }
        self.seen_set = f"{self.prefix}seen_urls"
    
    def add_task(self, task_id: str, task_data: dict, priority=TaskPriority.NORMAL, unique_key=None) -> bool:
        """添加任务,可选unique_key去重"""
        if unique_key and self.r.sismember(self.seen_set, unique_key):
            return False
        # 存储任务详情(7天过期)
        self.r.setex(f"{self.task_key}{task_id}", 7*24*3600, json.dumps(task_data))
        # 加入优先级队列
        self.r.lpush(self.priority_queue[priority], task_id)
        if unique_key:
            self.r.sadd(self.seen_set, unique_key)
        return True
    
    def get_task(self, timeout=5) -> tuple[str, dict] | None:
        """按优先级获取任务,超时5秒"""
        for q in self.priority_queue.values():
            task_id = self.r.rpoplpush(q, f"{q}:processing", timeout=timeout)
            if task_id:
                task_data = json.loads(self.r.get(f"{self.task_key}{task_id}"))
                return task_id, task_data
        return None

使用示例:

queue = RedisTaskQueue()
# 添加一个高优先级任务,并指定 unique_key 为商品ID
queue.add_task("task_001", {"url": "https://example.com/product/123"}, 
               priority=TaskPriority.HIGH, unique_key="product:123")
# 获取任务
task_id, task_data = queue.get_task()

爬虫工作节点在空闲时会调用 get_task() 阻塞等待新任务,拿到任务后执行抓取逻辑。任务完成后可以通过删除 {prefix}queue:high:processing 中的 task_id 来确认完成,或者配合异常重试机制重新放回队列。

3.2 极简本地代理池(基于 HTTPbin 验证)

IP 代理是防封的关键。对于小规模项目,维护一个动态代理池成本较高,我们可以先用一个预定义的代理列表,通过周期性验证来筛选可用代理。下面的 SimpleProxyPool 会在初始化时验证列表中的所有代理,并将活着的代理存入列表,后续每次调用 get_random() 随机返回一个。

import requests
import random
import time

class SimpleProxyPool:
    """基于预定义代理列表+HTTPbin验证的本地代理池"""
    
    def __init__(self, proxy_list: list[str], check_url="http://httpbin.org/ip", timeout=3):
        """proxy_list格式: ['http://user:pass@ip:port', 'socks5://ip:port']"""
        self.raw_list = proxy_list
        self.check_url = check_url
        self.timeout = timeout
        self.alive_proxies = []
        self._init_proxies()
    
    def _check_one(self, proxy: str) -> bool:
        try:
            resp = requests.get(self.check_url, proxies={"http": proxy, "https": proxy}, timeout=self.timeout)
            return resp.status_code == 200
        except:
            return False
    
    def _init_proxies(self):
        for p in self.raw_list:
            if self._check_one(p):
                self.alive_proxies.append(p)
                print(f"[√] 代理 {p.split('@')[-1]} 验证通过")
        print(f"[√] 代理池初始化完成,共 {len(self.alive_proxies)} 个可用代理")
    
    def get_random(self) -> str | None:
        if not self.alive_proxies:
            return None
        return random.choice(self.alive_proxies)

在实际爬虫脚本中,我们可以在 requests 或者 uiautomator2 发出的 HTTP 请求中,通过设置环境变量 HTTP_PROXY / HTTPS_PROXY 或者直接在代码中指定代理来使用这个池子。为了提高代理利用率,你还可以每隔几分钟重新验证一次代理列表,剔除失效的,补充新的。


项目落地注意事项

  1. 法律合规永远第一
    不要爬取用户隐私数据,不要对平台服务造成过大压力,不要将抓取数据用于商业转售。技术无罪,但用错了地方就会触犯红线。

  2. 性能与稳定性平衡
    单设备 UI 自动化建议并发线程数 ≤ 2,否则设备可能出现卡顿甚至崩溃。代理需要定期刷新,Redis 中的任务状态要配合超时重试机制,避免任务卡死。

  3. 去重机制优化
    除了基于 unique_key 的 Redis Set 去重,你还可以为商品标题、价格、图片链接等计算 MD5 值作为辅助去重标识,进一步提高数据质量。

  4. 日志与监控
    强烈建议使用 Python 内置的 logging 模块全面记录运行信息,并将设备状态(空闲、工作中、异常)存入 Redis Hash,方便通过简单的命令行脚本或 Web 面板查看整体运行状况。


你现在已经掌握了一套完整的电商App爬虫系统的核心骨架。接下来你可以根据实际目标 App 的特点,调整元素定位规则、完善exception-handling、接入更多代理源,甚至加入手机群控平台(如 STF 或 minicap/minitouch)来统一管理数十台设备。祝你爬虫之路顺利,但切记——技术为善,合规前行!