Python 爬虫实战:PyQuery + MongoDB 指南
1. 前言
刚入门爬虫时,最让人崩溃的三件事往往是:
- 列表翻页一抓就卡,10 页数据能跑半小时还频频报错;
- 重复运行一次,MongoDB 里就多出几百条重复电影,数据脏得没法用;
- HTML 中想提取“上映时间”,结果标签既没 id 也没 class,定位全靠猜,改版一次代码全废。
本文将用一个轻量但覆盖工业级基础逻辑的电影数据爬虫原型,一次性解决这三个问题:
- 用 PyQuery 像写 jQuery 一样快速解析 HTML;
- 用 MongoDB 的 upsert 机制自动去重与更新;
- 用 Python 内置多进程池绕过 GIL,实现页码级并发采集。
整份代码不超过 150 行,但套路可直接复用到自己的项目里。
2. 前置准备
先把环境搭好,别等写了一半才到处查报错。
依赖库安装
# 三个核心库:网络请求、HTML 解析、NoSQL 存储
pip install requests pyquery pymongo
MongoDB 启动
本地安装 MongoDB 后,通过命令行或可视化工具(如 MongoDB Compass)启动服务,默认监听端口 27017,我们后面直接连 mongodb://localhost:27017。
3. 完整代码 + 模块拆解
下面是带超详细中文注释的完整代码,按照“配置 → 通用请求 → 索引页处理 → 详情页处理 → 数据存储 → 并发调度”的顺序一路贯通。建议先整体扫一遍,再重点看后面拆解的模块。
import requests
import logging
import re
import pymongo
from pyquery import PyQuery as pq
from urllib.parse import urljoin
import multiprocessing
# ======================== 1. 全局配置 ========================
# 日志配置:带时间戳、级别,方便定位爬取/解析中的问题
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
# 常量配置(所有可修改的都集中在这里,后续换目标网站只需改这些)
BASE_URL = 'https://ssr1.scrape.center' # 崔老师的公开爬虫测试站
TOTAL_PAGE = 10 # 要爬的总页数
MONGO_URI = 'mongodb://localhost:27017' # MongoDB 连接地址
MONGO_DB = 'scrape_movies' # 数据库名
MONGO_COL = 'movies' # 集合名
# ======================== 2. MongoDB 初始化 ========================
client = pymongo.MongoClient(MONGO_URI)
db = client[MONGO_DB]
collection = db[MONGO_COL]
# ======================== 3. 通用请求模块 ========================
def scrape_page(url):
"""
通用的单页抓取函数:设置超时、捕获异常,
成功返回 HTML 文本,否则返回 None。
"""
logging.info('🚀 开始抓取: %s', url)
try:
# 超时必须设置,防止某个网页卡住整个进程
resp = requests.get(url, timeout=10)
if resp.status_code == 200:
return resp.text
logging.error('❌ 状态码错误: %d, URL: %s', resp.status_code, url)
except requests.RequestException as e:
logging.error('❌ 请求异常: %s, URL: %s', str(e), url, exc_info=False)
return None
# ======================== 4. 列表页(索引页)处理 ========================
def scrape_index(page_num):
"""抓取指定页码的列表页,复用通用请求函数"""
index_url = f"{BASE_URL}/page/{page_num}"
return scrape_page(index_url)
def parse_index(html):
"""
解析列表页:用 PyQuery 的 CSS 选择器提取所有详情页 URL。
使用 yield 返回生成器,避免一次性存储大量 URL 撑爆内存。
"""
doc = pq(html)
# 定位电影卡片里的标题链接(测试站结构很清晰)
link_nodes = doc('.el-card .name')
for node in link_nodes.items():
# 将相对路径转为绝对路径
detail_url = urljoin(BASE_URL, node.attr('href'))
logging.info('📍 发现详情页: %s', detail_url)
yield detail_url
# ======================== 5. 详情页处理 ========================
def scrape_detail(url):
"""抓取详情页,直接复用通用请求函数"""
return scrape_page(url)
def parse_detail(html):
"""
解析详情页:清洗并提取成结构化的字典。
重点:处理缺失值、利用 :contains() 定位无 class/id 的元素。
"""
doc = pq(html)
# 1. 封面图 URL
cover = doc('img.cover').attr('src')
# 2. 电影名
name = doc('a > h2').text()
# 3. 分类列表
categories = [item.text() for item in doc('.categories button span').items()]
# 4. 上映时间(有些电影可能没有,且位置不固定)
published_info = doc('.info:contains(上映)').text() # PyQuery 独有的 :contains 选择器
published_at = None
if published_info:
match = re.search(r'\d{4}-\d{2}-\d{2}', published_info)
published_at = match.group(1) if match else None
# 5. 剧情简介
drama = doc('.drama p').text()
# 6. 评分(转为浮点数,方便后续排序/分析)
score = doc('p.score').text()
score = float(score.strip()) if score and score.strip() else None
return {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'drama': drama,
'score': score
}
# ======================== 6. 数据存储模块 ========================
def save_movie(data):
"""
存储电影数据,MongoDB upsert 模式:
- 以“电影名”作为唯一标识;
- 存在则更新,不存在则插入,彻底告别重复数据。
"""
if not data:
return
result = collection.update_one(
{'name': data['name']},
{'$set': data},
upsert=True
)
if result.upserted_id:
logging.info('✅ 新增电影: %s', data['name'])
else:
logging.info('🔄 更新电影: %s', data['name'])
# ======================== 7. 单页完整逻辑 + 多进程调度 ========================
def process_page(page_num):
"""
单页的完整流程:
抓列表页 → 解析出详情页 URL → 逐个抓取详情页 → 解析 → 存储
"""
index_html = scrape_index(page_num)
if not index_html:
return
detail_urls = parse_index(index_html)
for url in detail_urls:
detail_html = scrape_detail(url)
if not detail_html:
continue
movie_data = parse_detail(detail_html)
save_movie(movie_data)
if __name__ == '__main__':
"""
多进程启动入口,必须写在 if __name__ == '__main__' 里!
Pool() 默认开启 CPU 核心数个进程,爬虫属于 I/O 密集型,可以多开一些。
"""
logging.info('🎬 电影爬虫正式启动!')
pool = multiprocessing.Pool(processes=5) # 测试站较小,开 5 个进程足够
pages = range(1, TOTAL_PAGE + 1)
pool.map(process_page, pages) # 自动将每个页码传给 process_page
pool.close() # 关闭进程池,不再接受新任务
pool.join() # 等待所有子进程执行完毕
logging.info('🎉 所有任务完成!')
代码阅读提示:以上代码直接复制即可运行(前提是 MongoDB 已启动,并且目标测试站可访问)。下面我们会挑出几个关键模块,深入分析其设计思路和常见陷阱。
4. 核心技术亮点
这套代码看似简单,但每个模块都藏着爬虫开发的最佳实践,挑 3 个最重要的详聊。
🎨 PyQuery 的 :contains() 选择器
解析 HTML 时,最头疼的就是 没有 id/class,只能靠文本内容定位的元素(比如本例中的“上映”信息)。传统做法是数标签顺序,或者写复杂的 XPath,一旦页面结构微调,代码全废。
PyQuery 直接搬来了 jQuery 的 :contains() 语法,一行代码解决问题:
# ❌ 不推荐:依赖固定位置,页面一改就错
# doc('.info').eq(3).text()
# ✅ 推荐:根据文本内容动态定位
published_info = doc('.info:contains(上映)').text()
使用建议::contains() 不仅能定位,还能配合正则最终提取,非常适合处理半结构化字段。
🛡️ MongoDB 的 upsert 去重/更新
新手常用的 insert_one() 有两个大坑:
- 如果集合建了唯一索引,重复插入相同键直接报错;
- 如果没有唯一索引,每次运行都往里塞重复数据,几天后数据库里全是垃圾。
update_one(..., upsert=True) 则完美规避:
- 先用
{'name': data['name']} 当作查找条件(天然唯一键);
- 找到就执行
$set 更新所有字段(比如评分变了会自动同步);
- 找不到就插入一条新文档。
无论是首次爬取还是增量更新,同一个函数通杀。
⚡ 多进程池调度,突破 GIL 限制
Python 因为 GIL 的存在,单线程下同一时刻只有一个 CPU 核心在工作。但爬虫 90% 的时间都在等待网络响应,属于 I/O 阻塞,CPU 其实是闲着的。
这时候 multiprocessing.Pool() 多进程就派上用场:
- 每个进程独立拥有 GIL,能真正利用多核 CPU;
- 进程 A 在等列表页返回时,CPU 可以切换到进程 B 去解析详情页;
- 进程 B 在等详情页返回时,进程 C 可以去存数据、发起新请求。
将 process_page 丢进进程池,用 pool.map() 并行处理所有页码,总耗时可以降为原来的 1/4 甚至更低(具体看目标网站的响应速度和反爬策略)。
5. 可扩展方向(进阶)
上面这个原型已经够跑通基本数据采集,但实际生产环境中,你还可以继续往上加模块:
- 伪装身份:维护 User-Agent 池,随机切换 Referer,降低被识别为爬虫的概率。
- 反反爬:接入代理 IP 池,处理验证码(如图形验证码、滑块验证,用 Selenium 或 Playwright)。
- 增量爬取:记录最后一次爬取的页码或时间戳,只爬新增/更新的内容,避免从第一页重新扫。
- 错误重试:用
tenacity 库给请求函数加上自动重试机制,网络波动时不再手动重跑。
- 数据校验:用
pydantic 定义数据模型,入库前自动校验每个字段的类型和格式,脏数据无处藏身。
小提醒:爬取公开数据是合法的,但请务必遵守目标网站的 robots.txt 规则,控制并发量,不要给服务器带来过大压力。不要爬取隐私或商业敏感数据。