Python 爬虫实战:PyQuery + MongoDB 指南

1. 前言

刚入门爬虫时,最让人崩溃的三件事往往是:

  • 列表翻页一抓就卡,10 页数据能跑半小时还频频报错;
  • 重复运行一次,MongoDB 里就多出几百条重复电影,数据脏得没法用;
  • HTML 中想提取“上映时间”,结果标签既没 id 也没 class,定位全靠猜,改版一次代码全废。

本文将用一个轻量但覆盖工业级基础逻辑的电影数据爬虫原型,一次性解决这三个问题:

  • PyQuery 像写 jQuery 一样快速解析 HTML;
  • MongoDB 的 upsert 机制自动去重与更新;
  • Python 内置多进程池绕过 GIL,实现页码级并发采集。

整份代码不超过 150 行,但套路可直接复用到自己的项目里。


2. 前置准备

先把环境搭好,别等写了一半才到处查报错。

依赖库安装

# 三个核心库:网络请求、HTML 解析、NoSQL 存储
pip install requests pyquery pymongo

MongoDB 启动

本地安装 MongoDB 后,通过命令行或可视化工具(如 MongoDB Compass)启动服务,默认监听端口 27017,我们后面直接连 mongodb://localhost:27017


3. 完整代码 + 模块拆解

下面是带超详细中文注释的完整代码,按照“配置 → 通用请求 → 索引页处理 → 详情页处理 → 数据存储 → 并发调度”的顺序一路贯通。建议先整体扫一遍,再重点看后面拆解的模块。

import requests
import logging
import re
import pymongo
from pyquery import PyQuery as pq
from urllib.parse import urljoin
import multiprocessing

# ======================== 1. 全局配置 ========================
# 日志配置:带时间戳、级别,方便定位爬取/解析中的问题
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s: %(message)s')

# 常量配置(所有可修改的都集中在这里,后续换目标网站只需改这些)
BASE_URL = 'https://ssr1.scrape.center'  # 崔老师的公开爬虫测试站
TOTAL_PAGE = 10                             # 要爬的总页数
MONGO_URI = 'mongodb://localhost:27017'    # MongoDB 连接地址
MONGO_DB = 'scrape_movies'                  # 数据库名
MONGO_COL = 'movies'                         # 集合名

# ======================== 2. MongoDB 初始化 ========================
client = pymongo.MongoClient(MONGO_URI)
db = client[MONGO_DB]
collection = db[MONGO_COL]

# ======================== 3. 通用请求模块 ========================
def scrape_page(url):
    """
    通用的单页抓取函数:设置超时、捕获异常,
    成功返回 HTML 文本,否则返回 None。
    """
    logging.info('🚀 开始抓取: %s', url)
    try:
        # 超时必须设置,防止某个网页卡住整个进程
        resp = requests.get(url, timeout=10)
        if resp.status_code == 200:
            return resp.text
        logging.error('❌ 状态码错误: %d, URL: %s', resp.status_code, url)
    except requests.RequestException as e:
        logging.error('❌ 请求异常: %s, URL: %s', str(e), url, exc_info=False)
    return None

# ======================== 4. 列表页(索引页)处理 ========================
def scrape_index(page_num):
    """抓取指定页码的列表页,复用通用请求函数"""
    index_url = f"{BASE_URL}/page/{page_num}"
    return scrape_page(index_url)

def parse_index(html):
    """
    解析列表页:用 PyQuery 的 CSS 选择器提取所有详情页 URL。
    使用 yield 返回生成器,避免一次性存储大量 URL 撑爆内存。
    """
    doc = pq(html)
    # 定位电影卡片里的标题链接(测试站结构很清晰)
    link_nodes = doc('.el-card .name')
    for node in link_nodes.items():
        # 将相对路径转为绝对路径
        detail_url = urljoin(BASE_URL, node.attr('href'))
        logging.info('📍 发现详情页: %s', detail_url)
        yield detail_url

# ======================== 5. 详情页处理 ========================
def scrape_detail(url):
    """抓取详情页,直接复用通用请求函数"""
    return scrape_page(url)

def parse_detail(html):
    """
    解析详情页:清洗并提取成结构化的字典。
    重点:处理缺失值、利用 :contains() 定位无 class/id 的元素。
    """
    doc = pq(html)
    # 1. 封面图 URL
    cover = doc('img.cover').attr('src')
    # 2. 电影名
    name = doc('a > h2').text()
    # 3. 分类列表
    categories = [item.text() for item in doc('.categories button span').items()]
    # 4. 上映时间(有些电影可能没有,且位置不固定)
    published_info = doc('.info:contains(上映)').text()  # PyQuery 独有的 :contains 选择器
    published_at = None
    if published_info:
        match = re.search(r'\d{4}-\d{2}-\d{2}', published_info)
        published_at = match.group(1) if match else None
    # 5. 剧情简介
    drama = doc('.drama p').text()
    # 6. 评分(转为浮点数,方便后续排序/分析)
    score = doc('p.score').text()
    score = float(score.strip()) if score and score.strip() else None

    return {
        'cover': cover,
        'name': name,
        'categories': categories,
        'published_at': published_at,
        'drama': drama,
        'score': score
    }

# ======================== 6. 数据存储模块 ========================
def save_movie(data):
    """
    存储电影数据,MongoDB upsert 模式:
    - 以“电影名”作为唯一标识;
    - 存在则更新,不存在则插入,彻底告别重复数据。
    """
    if not data:
        return
    result = collection.update_one(
        {'name': data['name']},
        {'$set': data},
        upsert=True
    )
    if result.upserted_id:
        logging.info('✅ 新增电影: %s', data['name'])
    else:
        logging.info('🔄 更新电影: %s', data['name'])

# ======================== 7. 单页完整逻辑 + 多进程调度 ========================
def process_page(page_num):
    """
    单页的完整流程:
    抓列表页 → 解析出详情页 URL → 逐个抓取详情页 → 解析 → 存储
    """
    index_html = scrape_index(page_num)
    if not index_html:
        return
    detail_urls = parse_index(index_html)
    for url in detail_urls:
        detail_html = scrape_detail(url)
        if not detail_html:
            continue
        movie_data = parse_detail(detail_html)
        save_movie(movie_data)

if __name__ == '__main__':
    """
    多进程启动入口,必须写在 if __name__ == '__main__' 里!
    Pool() 默认开启 CPU 核心数个进程,爬虫属于 I/O 密集型,可以多开一些。
    """
    logging.info('🎬 电影爬虫正式启动!')
    pool = multiprocessing.Pool(processes=5)  # 测试站较小,开 5 个进程足够
    pages = range(1, TOTAL_PAGE + 1)
    pool.map(process_page, pages)  # 自动将每个页码传给 process_page
    pool.close()  # 关闭进程池,不再接受新任务
    pool.join()   # 等待所有子进程执行完毕
    logging.info('🎉 所有任务完成!')

代码阅读提示:以上代码直接复制即可运行(前提是 MongoDB 已启动,并且目标测试站可访问)。下面我们会挑出几个关键模块,深入分析其设计思路和常见陷阱。


4. 核心技术亮点

这套代码看似简单,但每个模块都藏着爬虫开发的最佳实践,挑 3 个最重要的详聊。

🎨 PyQuery 的 :contains() 选择器

解析 HTML 时,最头疼的就是 没有 id/class,只能靠文本内容定位的元素(比如本例中的“上映”信息)。传统做法是数标签顺序,或者写复杂的 XPath,一旦页面结构微调,代码全废。

PyQuery 直接搬来了 jQuery 的 :contains() 语法,一行代码解决问题:

# ❌ 不推荐:依赖固定位置,页面一改就错
# doc('.info').eq(3).text()

# ✅ 推荐:根据文本内容动态定位
published_info = doc('.info:contains(上映)').text()

使用建议:contains() 不仅能定位,还能配合正则最终提取,非常适合处理半结构化字段。

🛡️ MongoDB 的 upsert 去重/更新

新手常用的 insert_one() 有两个大坑:

  1. 如果集合建了唯一索引,重复插入相同键直接报错;
  2. 如果没有唯一索引,每次运行都往里塞重复数据,几天后数据库里全是垃圾。

update_one(..., upsert=True) 则完美规避:

  • 先用 {'name': data['name']} 当作查找条件(天然唯一键);
  • 找到就执行 $set 更新所有字段(比如评分变了会自动同步);
  • 找不到就插入一条新文档。

无论是首次爬取还是增量更新,同一个函数通杀。

⚡ 多进程池调度,突破 GIL 限制

Python 因为 GIL 的存在,单线程下同一时刻只有一个 CPU 核心在工作。但爬虫 90% 的时间都在等待网络响应,属于 I/O 阻塞,CPU 其实是闲着的。

这时候 multiprocessing.Pool() 多进程就派上用场:

  • 每个进程独立拥有 GIL,能真正利用多核 CPU;
  • 进程 A 在等列表页返回时,CPU 可以切换到进程 B 去解析详情页;
  • 进程 B 在等详情页返回时,进程 C 可以去存数据、发起新请求。

process_page 丢进进程池,用 pool.map() 并行处理所有页码,总耗时可以降为原来的 1/4 甚至更低(具体看目标网站的响应速度和反爬策略)。


5. 可扩展方向(进阶)

上面这个原型已经够跑通基本数据采集,但实际生产环境中,你还可以继续往上加模块:

  1. 伪装身份:维护 User-Agent 池,随机切换 Referer,降低被识别为爬虫的概率。
  2. 反反爬:接入代理 IP 池,处理验证码(如图形验证码、滑块验证,用 Selenium 或 Playwright)。
  3. 增量爬取:记录最后一次爬取的页码或时间戳,只爬新增/更新的内容,避免从第一页重新扫。
  4. 错误重试:用 tenacity 库给请求函数加上自动重试机制,网络波动时不再手动重跑。
  5. 数据校验:用 pydantic 定义数据模型,入库前自动校验每个字段的类型和格式,脏数据无处藏身。

小提醒:爬取公开数据是合法的,但请务必遵守目标网站的 robots.txt 规则,控制并发量,不要给服务器带来过大压力。不要爬取隐私或商业敏感数据。