Python 爬虫实战:PyQuery + MongoDB 指南

1. 前言

在数据爬取实战中,面对上千条详情页数据,单线程的线性爬取往往效率低下。本文将演示如何构建一个工业级的爬虫原型,通过多进程并发提速,并解决以下核心痛点:

  • 非结构化数据清洗:利用 PyQuery 像操作 jQuery 一样解析 HTML。
  • 增量更新与去重:利用 MongoDB 的 upsert 操作防止重复数据。
  • 并发瓶颈:利用进程池实现页码级的并行调度。

2. 技术准备

在开始编写代码前,请确保安装了以下依赖库,并启动了本地 MongoDB 服务:

# 安装请求库、解析库、数据库驱动
pip install requests pyquery pymongo
  • PyQuery:基于 lxml 的 Python 库,支持 CSS 选择器。
  • MongoDB:NoSQL 数据库,非常适合存储爬虫抓取的 JSON 风格数据。
  • multiprocessing:Python 内置的多进程库,用于绕过 GIL 限制。

3. 爬虫实现

以下是完整代码实现,包含详细的注释说明:

import requests
import logging
import re
import pymongo
from pyquery import PyQuery as pq
from urllib.parse import urljoin
import multiprocessing

# 1. 配置日志:输出时间、级别和内容,方便调试
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s: %(message)s')

# 常量配置
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
MONGO_CONNECTION_STRING = 'mongodb://localhost:27017'
MONGO_DB_NAME = 'movies'
MONGO_COLLECTION_NAME = 'movies'

# 2. 初始化 MongoDB 连接
client = pymongo.MongoClient(MONGO_CONNECTION_STRING)
db = client[MONGO_DB_NAME]
collection = db[MONGO_COLLECTION_NAME]

def scrape_page(url):
    """通用抓取函数,包含异常处理"""
    logging.info('正在爬取 %s...', url)
    try:
        response = requests.get(url, timeout=10) # 建议增加超时设置
        if response.status_code == 200:
            return response.text
        logging.error('抓取失败,状态码: %s, URL: %s', response.status_code, url)
    except requests.RequestException:
        logging.error('抓取 %s 时发生错误', url, exc_info=True)

def scrape_index(page):
    """抓取列表索引页"""
    index_url = f'{BASE_URL}/page/{page}'
    return scrape_page(index_url)

def parse_index(html):
    """解析索引页,利用 PyQuery 提取所有详情页 URL"""
    doc = pq(html)
    links = doc('.el-card .name') # 定位 CSS 选择器
    for link in links.items():
        href = link.attr('href')
        detail_url = urljoin(BASE_URL, href)
        logging.info('获取到详情页地址: %s', detail_url)
        yield detail_url

def scrape_detail(url):
    """抓取详情页"""
    return scrape_page(url)

def parse_detail(html):
    """解析详情页数据,清洗并转换为字段"""
    doc = pq(html)
    cover = doc('img.cover').attr('src')
    name = doc('a > h2').text()
    categories = [item.text() for item in doc('.categories button span').items()]
    
    # 使用正则提取上映时间,注意处理 Python 3.12+ 的转义字符
    published_at = doc('.info:contains(上映)').text()
    published_at = re.search(r'(\d{4}-\d{2}-\d{2})', published_at).group(1) \
        if published_at and re.search(r'\d{4}-\d{2}-\d{2}', published_at) else None
        
    drama = doc('.drama p').text()
    score = doc('p.score').text()
    score = float(score) if score else None
    
    return {
        'cover': cover,
        'name': name,
        'categories': categories,
        'published_at': published_at,
        'drama': drama,
        'score': score
    }

def save_data(data):
    """保存数据到 MongoDB,使用 upsert 模式"""
    if data:
        collection.update_one({
            'name': data.get('name') # 根据电影名唯一标识
        }, {
            '$set': data
        }, upsert=True) # 如果存在则更新,不存在则插入

def main(page):
    """单页爬取主逻辑"""
    index_html = scrape_index(page)
    if not index_html:
        return
    detail_urls = parse_index(index_html)
    for detail_url in detail_urls:
        detail_html = scrape_detail(detail_url)
        data = parse_detail(detail_html)
        logging.info('获取详情数据: %s', data.get('name'))
        save_data(data)
        logging.info('数据已存入 MongoDB')

if __name__ == '__main__':
    # 3. 多进程并发调度
    pool = multiprocessing.Pool() # 默认开启 CPU 核心数个进程
    pages = range(1, TOTAL_PAGE + 1)
    pool.map(main, pages)
    pool.close()
    pool.join() # 等待所有任务完成

4. 技术要点深度解析

🎯 PyQuery 选择器艺术

在代码中我们使用了 doc('.el-card .name')doc('.info:contains(上映)')。PyQuery 的强大之处在于它完美继承了 jQuery 的 contains 选择器,可以非常方便地定位那些没有独立 ID 或 Class、仅包含特定文本的 HTML 元素。

🛡️ MongoDB 的 Upsert 策略

传统的 insert_one 在重复运行时会报错或导致数据重复。我们使用的 collection.update_one(..., upsert=True) 是爬虫开发的最佳实践:

  • 防重:以“电影名”为 Key,即便程序中断重启,也不会产生冗余数据。
  • 更新:如果目标站点的评分(score)发生了变化,下次运行会自动更新数据库中的旧值。

⚡ 多进程加速原理

通过 multiprocessing.Pool().map(main, pages),我们将 10 个页码的任务分配给多个进程。由于爬虫大部分时间处于等待网络响应的 I/O 阻塞状态,多进程可以利用系统调度,在某个进程等待网络时,让其他进程继续解析,从而使抓取效率成倍增长。


结语:这套模板涵盖了现代爬虫的“抓取-解析-存储-提速”全流程。在实际应用中,你还可以继续增加 User-Agent 池、代理 IP 转发等模块以增强爬虫的隐蔽性。