Python 异步爬虫教程:aiohttp 详解

试想你要从 10 个网站抓取数据,每个网站响应恰好要 1 秒钟。如果用最熟悉的 requests 库,你会发现程序至少需要 10 秒——请求一个接一个地“排队”,后面的只能干等。而今天的主角 aiohttp,可以让你同时发起所有请求,总耗时可能还不到 1.1 秒。这就是异步爬虫的魅力。

1. 概述

aiohttp 是 Python 异步库 asyncio 生态中的核心 HTTP 工具,同时提供客户端和服务器端的能力。在爬虫开发中,我们主要用到它的客户端部分,用来发起高并发、非阻塞的 HTTP 请求。

简单来说,它让你的爬虫像一个高效的餐厅服务员:不等上一道菜上桌,就跑去招呼下一桌客人。

1.1 为什么选它?

  • 🚀 完全异步无阻塞:多个请求并行处理,10 个 1 秒延迟的网站总耗时可能仅需 1 秒多
  • 📡 支持 HTTP/HTTPS/WebSocket:无论是普通爬虫还是实时数据流,都可以应对
  • 🤝 内置连接池、会话、代理、Cookie 管理:不用自己再造轮子
  • 远超同步库 requests 的性能:在大规模采集场景下优势极为明显

2. 基础入门

2.1 安装

一行命令即可安装,注意 Python 版本应 ≥ 3.7(低版本兼容 3.6,但强烈建议升级):

pip install aiohttp

2.2 第一个异步爬虫

编写异步代码前,先记住两个核心关键字:

  • async def:定义一个异步函数,它不能直接调用,需要交给事件循环去调度
  • await:等待一个异步操作完成(比如发送请求、读取响应数据),只能出现在 async def 函数内部

下面就用它们来抓取 example.com,并打印前 200 个字符:

import aiohttp
import asyncio

async def fetch(session: aiohttp.ClientSession, url: str) -> str:
    # 用 async with 发起请求,确保响应结束后自动释放连接
    async with session.get(url) as response:
        # 读取响应文本也需要 await,因为数据是异步传输的
        return await response.text()

async def main():
    # 使用 async with 创建并自动关闭 ClientSession
    # Session 内部会复用 Cookie、请求头、连接池,不要为每个请求都新建一个!
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, "https://example.com")
        print(html[:200])

# Python 3.7+ 推荐直接用 asyncio.run() 启动事件循环
if __name__ == "__main__":
    asyncio.run(main())

💡 要点: ClientSession 类似于浏览器的“无痕窗口”,在里面打开的所有页面都会共享环境,并且用完关上即可,不要反复创建。


3. 常见请求配置

3.1 URL 中带参数

避免手动拼接和转义,直接将参数以字典形式传给 params,aiohttp 会自动处理:

async def search_baidu(session: aiohttp.ClientSession, keyword: str) -> str:
    params = {"wd": keyword, "ie": "utf-8"}
    async with session.get("https://www.baidu.com/s", params=params) as response:
        return await response.text()

3.2 设置请求头

爬虫最常见的伪装就是修改 User-Agent。直接把需要定制的头部放在 headers 字典里即可:

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
async with session.get(url, headers=headers) as response:
    ...

3.3 超时控制

一个慢请求可能拖住整个程序。aiohttp.ClientTimeout 可以精细控制总超时以及连接、读取等各阶段的时间:

# 总超时 10s,连接阶段最多 2s,逐块读取最多 3s
timeout = aiohttp.ClientTimeout(total=10, connect=2, sock_read=3)
async with session.get(url, timeout=timeout) as response:
    ...

4. 支持的请求方法

除了最常用的 GETaiohttp 同样支持 POSTPUTDELETE 等所有标准 HTTP 方法。

4.1 POST 请求

表单提交(对应 requests 中的 data

常用于模拟登录或普通表单:

form_data = {"username": "test", "password": "123456"}
async with session.post("https://example.com/login", data=form_data) as response:
    ...

JSON 提交(对应 requests 中的 json

与 RESTful API 交互时使用:

json_payload = {"name": "Alice", "age": 25, "city": "Beijing"}
async with session.post("https://example.com/api/users", json=json_payload) as response:
    ...

4.2 PUT / DELETE 等

用法和 GETPOST 几乎完全一致,根据接口需要替换方法名就行:

# PUT 更新资源
async with session.put("https://example.com/api/users/1", json={"age": 26}) as response:
    ...

# DELETE 删除资源
async with session.delete("https://example.com/api/users/1") as response:
    ...

5. 处理响应数据

5.1 基础响应信息

状态码、响应头等属性可以直接获取,但响应体必须用 await 读取,因为它是异步数据流。

async with session.get(url) as response:
    print(f"状态码: {response.status}")                      # 例如 200
    print(f"响应头: {response.headers}")                     # dict 类型
    print(f"Content-Type: {response.headers.get('Content-Type')}")

    # 三种最常用的读取方式
    text = await response.text()          # 文本内容
    bytes_data = await response.read()    # 二进制内容(适合图片、PDF)
    json_data = await response.json()     # 自动解析 JSON

5.2 大文件流式下载

如果要下载几百 MB 的文件,直接调用 read() 会把整个文件搬进内存,很可能导致程序崩溃。正确的做法是分块读取:

async def download_large_file(session: aiohttp.ClientSession, url: str, save_path: str) -> None:
    async with session.get(url) as response:
        response.raise_for_status()   # 检查状态码,不是 2xx 就抛异常
        with open(save_path, "wb") as f:
            # 每次读取 1 MB,避免一次性占用大量内存
            while chunk := await response.content.read(1024 * 1024):
                f.write(chunk)

6. 进阶实用功能

6.1 控制并发数

无限制地开启成百上千个并发,不仅可能被服务器封 IP,还会占满本机带宽。asyncio.Semaphore 可以帮助你限制同时进行的请求数量:

# 最多允许 10 个请求同时运行
sem = asyncio.Semaphore(10)

async def safe_fetch(session: aiohttp.ClientSession, url: str) -> str:
    # 用 async with 自动获取和释放信号量
    async with sem:
        async with session.get(url) as response:
            return await response.text()

6.2 会话持久化

ClientSession 内部会维护 Cookie、连接池和默认请求头。在创建时统一配置,后续所有请求都会自动使用,非常方便:

async def persistent_session_demo() -> None:
    # 统一设置默认 cookie、UA 和超时
    session = aiohttp.ClientSession(
        cookies={"session_id": "abc123"},
        headers={"User-Agent": "MyCrawler/1.0"},
        timeout=aiohttp.ClientTimeout(total=20)
    )
    try:
        # 两次请求会自动复用 cookie 和已有连接
        async with session.get("https://example.com/page1") as r1:
            ...
        async with session.get("https://example.com/page2") as r2:
            ...
    finally:
        # 如果没用 async with 创建 session,必须手动关闭
        await session.close()

6.3 代理设置

直接通过 proxy 参数配置 HTTP/HTTPS 代理,也支持带认证的代理:

# 普通代理
async with session.get(url, proxy="http://127.0.0.1:7890") as response:
    ...

# 带用户名密码的代理
async with session.get(url, proxy="http://user:pass@proxy.example.com:8080") as response:
    ...

7. 错误处理

网络请求异常非常频繁,一定要主动捕获!aiohttp 常见的异常都继承自 aiohttp.ClientError,建议针对不同类型分别处理:

async def fetch_with_error_handling(session: aiohttp.ClientSession, url: str) -> str | None:
    try:
        async with session.get(url, timeout=10) as response:
            response.raise_for_status()   # 自动抛出 4xx/5xx 错误
            return await response.text()
    except aiohttp.ClientConnectorError:
        print(f"无法连接到服务器: {url}")
    except aiohttp.ClientTimeoutError:
        print(f"请求超时: {url}")
    except aiohttp.HTTPError as e:
        print(f"HTTP 错误: {e.status} - {url}")
    except aiohttp.ClientError as e:
        print(f"其他请求错误: {e} - {url}")
    return None

8. 完整实战:批量爬取网站

现在把上面学到的知识点串起来,写一个真实的爬虫:同时抓取 3 个网站,并发上限设为 2,并带上完整的错误处理。

import aiohttp
import asyncio

# 批量爬取的目标 URL
URLS = [
    "https://example.com",
    "https://httpbin.org/status/404",  # 故意测试 404 错误
    "https://httpbin.org/delay/2",     # 模拟 2 秒延迟
]

MAX_CONCURRENCY = 2   # 最大并发数
TIMEOUT = aiohttp.ClientTimeout(total=5)   # 单次请求总超时 5 秒

async def fetch(session: aiohttp.ClientSession, url: str, sem: asyncio.Semaphore) -> tuple[str, int | None]:
    async with sem:
        try:
            async with session.get(url, timeout=TIMEOUT) as response:
                response.raise_for_status()
                content = await response.text()
                return (url, len(content))
        except Exception as e:
            print(f"❌ 处理 {url} 失败: {str(e)}")
            return (url, None)

async def main():
    sem = asyncio.Semaphore(MAX_CONCURRENCY)
    async with aiohttp.ClientSession() as session:
        # 创建所有任务
        tasks = [fetch(session, url, sem) for url in URLS]
        # 并发执行,无论成败都会返回结果
        results = await asyncio.gather(*tasks)
        
        # 统计结果
        print("\n📊 爬取结果统计:")
        for url, length in results:
            if length:
                print(f"✅ {url}: 成功获取 {length} 字符")
            else:
                print(f"❌ {url}: 爬取失败")

if __name__ == "__main__":
    asyncio.run(main())

运行这段代码,你会发现尽管有一个 2 秒延迟的页面,另两个页面的结果几乎同时返回,总耗时远小于三个页面各自耗时的累加。


9. 最佳实践总结

  1. 必须复用 ClientSession:不要为每个请求都新建,初始化的开销比较大
  2. 务必控制并发数:根据服务器承受能力和自身网络状况灵活调整,一般建议 5~20
  3. 合理设置超时:避免被个别慢请求拖累整个爬虫
  4. 大文件分块下载:防止内存溢出
  5. 完整的错误处理:网络环境不可控,所有可能的异常都要覆盖
  6. 善用 async with 管理生命周期:自动关闭 Session 和信号量,不易出错

10. 延伸学习

如果想更深入地掌握 aiohttp,建议阅读它的官方文档,并结合 asyncio 的学习,彻底搞懂事件循环和异步编程模型:

  • aiohttp 官方文档
  • 处理更复杂的并发任务调度时,还可以搭配 aiomultiprocessing 等库,进一步提升大批量任务的执行效率

希望这篇教程能帮你快速上手 aiohttp,写出高效稳定的异步爬虫!