🛠️ Python 爬虫实战教学:豆瓣电影 Top250 同步与异步实战手册

前言: 在爬虫领域,效率即生命。当我们需要采集数万条数据时,单线程同步爬取的等待时间是无法接受的。本文将通过实战代码,带你从最基础的同步逻辑演进到极致并发的协程方案。


🛠️ 一、 核心工具栈

本案例的核心逻辑构建在以下几个实用主义优先的库之上:

  1. 数据采集requests(同步标准替代方案)/ aiohttp(异步高性能库)
  2. 数据解析lxml.etree → 用 XPath 直接定位 DOM,效率远超正则
  3. 存储优化DataRecorder → 支持多线程/进程安全写入,自动处理 Excel 锁和表头
  4. 逻辑补全itertools.zip_longest → 解决短评缺失导致的数据偏移问题

🧠 二、 任务拆分核心逻辑

并发编程不能直接套同步的 for 循环——必须把任务“原子化”成独立可调度单元

  • 同步流程请求页→解析页→写入文件 打包成一大块,全程阻塞等待
  • 并发/异步流程:只把「请求并解析返回一页数据」做成原子,由调度器(线程池/事件循环)统一分发

🚀 三、 全方案代码实战

前置准备:一键配置环境

直接复制下面的命令,安装所有必需依赖:

pip install requests aiohttp lxml DataRecorder openpyxl

1. 同步爬取:爬虫入门基石

逻辑线性清晰,但全程卡 I/O,适合理解完整流程

import os, time
from itertools import zip_longest
from lxml import etree
import requests
from DataRecorder import Recorder


def get_excel(mode):
    filename = f'top250_{mode}.xlsx'
    if os.path.exists(filename): os.remove(filename)
    recorder = Recorder(filename)
    recorder.show_msg = False
    return recorder


def run_sync():
    recorder = get_excel('同步')
    session = requests.Session()
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36'}
    for j in range(10):
        url = f'https://movie.douban.com/top250?start={j * 25}'
        res = session.get(url, headers=headers).text
        tree = etree.HTML(res)
        titles = tree.xpath('//ol[@class="grid_view"]//span[@class="title"][1]/text()')
        scores = tree.xpath('//span[@class="rating_num"]/text()')
        comments = tree.xpath('//span[@class="inq"]/text()')
        for title, score, comment in zip_longest(titles, scores, comments, fillvalue='无'):
            recorder.add_data({
                '电影名': title,
                '评分': score,
                '短评': comment
            })
        recorder.record()  
        print(f"已完成第 {j + 1} 页采集")


if __name__ == '__main__':
    start = time.time()
    run_sync()
    print(f'同步爬取耗时: {time.time() - start:.2f}秒')

2. 多线程方案:兼容旧库的首选

通过 ThreadPoolExecutor 快速提速,资源占用比多进程低,完美兼容 requests

from concurrent.futures import ThreadPoolExecutor
import os, time
from itertools import zip_longest
from lxml import etree
import requests
from DataRecorder import Recorder

def get_excel(mode):
    filename = f'top250_{mode}.xlsx'
    if os.path.exists(filename): os.remove(filename)
    recorder = Recorder(filename)
    recorder.show_msg = False
    return recorder

def fetch_page(page_index):
    url = f'https://movie.douban.com/top250?start={page_index*25}'
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
        'Referer': 'https://movie.douban.com/top250'
    }
    try:
        res = requests.get(url, headers=headers, timeout=10).text
        tree = etree.HTML(res)
        titles = tree.xpath('//ol[@class="grid_view"]//li//div[@class="hd"]/a/span[1]/text()')
        scores = tree.xpath('//span[@class="rating_num"]/text()')
        comments = tree.xpath('//span[@class="inq"]/text()')
        
        page_data = []
        for t, s, c in zip_longest(titles, scores, comments, fillvalue='无'):
            page_data.append({'电影名': t, '评分': s, '短评': c})
        
        print(f"线程已完成第 {page_index + 1} 页抓取")
        return page_data
    except Exception as e:
        print(f"抓取第 {page_index + 1} 页失败: {e}")
        return []

if __name__ == '__main__':
    recorder = get_excel('多线程')
    start = time.time()
    
    # 线程池:并发抓取(map保证返回顺序与任务顺序一致)
    with ThreadPoolExecutor(max_workers=5) as executor:
        all_results = list(executor.map(fetch_page, range(10)))
    
    # 主进程统一整合+写入硬盘(避免磁盘I/O碎片化)
    for page_data in all_results:
        for item in page_data:
            recorder.add_data(item)
    recorder.record()
    
    print(f'\n全部完成!')
    print(f'多线程耗时: {time.time() - start:.2f}秒')

3. 协程方案:单线程极致吞吐量

需使用异步 HTTP 库 aiohttp,资源消耗最低,并发能力最强

import asyncio
import aiohttp
import os, time
from itertools import zip_longest
from lxml import etree
from DataRecorder import Recorder

def get_excel(mode):
    filename = f'top250_{mode}.xlsx'
    if os.path.exists(filename): os.remove(filename)
    recorder = Recorder(filename)
    recorder.show_msg = False
    return recorder

async def fetch_async(page_index, session):
    url = f'https://movie.douban.com/top250?start={page_index*25}'
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
        'Referer': 'https://movie.douban.com/top250'
    }
    try:
        async with session.get(url, headers=headers) as resp:
            html = await resp.text()
        tree = etree.HTML(html)
        titles = tree.xpath('//ol[@class="grid_view"]//li//div[@class="hd"]/a/span[1]/text()')
        scores = tree.xpath('//span[@class="rating_num"]/text()')
        comments = tree.xpath('//span[@class="inq"]/text()')
        
        page_data = []
        for t, s, c in zip_longest(titles, scores, comments, fillvalue='无'):
            page_data.append({'电影名': t, '评分': s, '短评': c})
        return page_data
    except Exception as e:
        print(f"协程抓取第 {page_index + 1} 页失败: {e}")
        return []

async def main_async():
    recorder = get_excel('协程')
    start = time.time()
    # 复用同一个 ClientSession 提高效率
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_async(i, session) for i in range(10)]
        all_results = await asyncio.gather(*tasks)
    
    # 统一写入
    for page_data in all_results:
        for item in page_data:
            recorder.add_data(item)
    recorder.record()
    print(f'协程爬取耗时: {time.time() - start:.2f}秒')

if __name__ == '__main__':
    asyncio.run(main_async())

📊 四、 选型与优化建议

1. 方案选型指南

场景推荐方案理由
新手练习/数据量<1000条同步逻辑简单,无并发安全问题
中大型爬虫项目(纯网络I/O)协程资源消耗最低,单线程可跑千级并发
包含计算/解密任务的爬虫多进程绕过 GIL,真正利用多核CPU
维护旧代码/不想换库多线程兼容 requests,改动小,见效快

2. 避坑优化建议

  1. 控制并发数:豆瓣有请求频率限制,建议 max_workers/协程数控制在 5-10
  2. 批量写磁盘:不要在 for 循环里逐页调用 recorder.record(),最后统一写可降低 90%+ 的磁盘I/O
  3. 复用会话:同步用 requests.Session(),异步用 aiohttp.ClientSession(),复用 TCP 连接可提效 30% 左右