Python 异步爬虫教程:aiohttp 详解

当你用 requests 同步爬取 10 个响应延迟 1s 的网站时,总耗时至少是 10s——所有请求都在排队等待上一个完成。如果想把效率提上去?今天的主角 aiohttp 就能帮你解决这个问题。

1. 概述

aiohttp 是 Python 官方异步库 asyncio 生态中的核心 HTTP 工具,同时提供客户端和服务器端功能。在爬虫开发中,我们主要用它的客户端部分发起高并发、无阻塞的请求。

1.1 为什么选它?

  • 🚀 完全异步无阻塞:多个请求可以并行处理,10个1s延迟的网站总耗时可能只有1.1s
  • 📡 支持HTTP/HTTPS/WebSocket全链路:既可以做普通爬虫,也能处理实时数据流
  • 🤝 连接池/会话/代理/Cookie 全套配齐:不用自己造轮子管理复杂的请求环境
  • 性能远超同步库 requests:在大规模爬取场景下优势极其明显

2. 基础入门

2.1 安装

直接用 pip 就能安装,注意 Python 版本要 ≥3.7(旧版兼容 3.6,但建议升级):

pip install aiohttp

2.2 第一个异步爬虫

写异步代码前,先记住两个核心关键字:

  • async def:定义异步函数,不能直接调用,要配合事件循环
  • await:等待异步操作完成(比如发送请求、读取响应),只能放在 async def 函数里

下面是最简的例子,用 aiohttp 爬取 example.com 并打印前 200 个字符:

import aiohttp
import asyncio

async def fetch(session: aiohttp.ClientSession, url: str) -> str:
    # 用 async with 确保请求响应后自动释放连接
    async with session.get(url) as response:
        # 响应文本需要 await 读取,不是同步赋值
        return await response.text()

async def main():
    # 用 async with 创建并自动关闭 ClientSession
    # Session 可以复用 cookie、headers、连接池,不要每次请求都新建!
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, "https://example.com")
        print(html[:200])

# Python 3.7+ 推荐用 asyncio.run() 一键启动事件循环
if __name__ == "__main__":
    asyncio.run(main())

3. 常见请求配置

3.1 URL 带参数

params 传字典,会自动拼接到 URL 后面,避免手动转义特殊字符:

async def search_baidu(session: aiohttp.ClientSession, keyword: str) -> str:
    params = {"wd": keyword, "ie": "utf-8"}
    async with session.get("https://www.baidu.com/s", params=params) as response:
        return await response.text()

3.2 设置请求头

修改 User-Agent 伪装成浏览器是爬虫的基本操作,直接传 headers 字典即可:

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
async with session.get(url, headers=headers) as response:
    ...

3.3 超时控制

防止响应慢或卡住的请求拖慢整个程序,用 aiohttp.ClientTimeout 可以设置总超时各阶段超时(连接、读取等):

# 总超时 10s,连接阶段最多 2s,读取每块最多 3s
timeout = aiohttp.ClientTimeout(total=10, connect=2, sock_read=3)
async with session.get(url, timeout=timeout) as response:
    ...

4. 支持的请求方法

除了最常用的 GETaiohttp 也覆盖了 POST/PUT/DELETE 等所有 RESTful 方法。

4.1 POST 请求

表单提交(对应 requests 的 data)

# 比如模拟登录,传用户名密码
form_data = {"username": "test", "password": "123456"}
async with session.post("https://example.com/login", data=form_data) as response:
    ...

JSON 提交(对应 requests 的 json)

# 调用接口传 JSON 数据
json_payload = {"name": "Alice", "age": 25, "city": "Beijing"}
async with session.post("https://example.com/api/users", json=json_payload) as response:
    ...

4.2 PUT/DELETE 等

用法和 GET/POST 基本一致,直接替换方法名即可:

# PUT 更新资源
async with session.put("https://example.com/api/users/1", json={"age": 26}) as response:
    ...

# DELETE 删除资源
async with session.delete("https://example.com/api/users/1") as response:
    ...

5. 处理响应数据

5.1 基础响应信息

响应状态码、响应头可以直接获取,不需要 await;但内容必须 await 读取,因为是异步流。

async with session.get(url) as response:
    print(f"状态码: {response.status}")  # 比如 200
    print(f"响应头: {response.headers}")  # dict 格式
    print(f"Content-Type: {response.headers.get('Content-Type')}")

    # 文本内容
    text = await response.text()
    # 二进制内容(比如下载图片、PDF)
    bytes_content = await response.read()
    # JSON 内容(自动解析)
    json_data = await response.json()

5.2 大文件流式下载

如果要下载几百 MB 的文件,直接用 read() 会把整个文件加载到内存里,容易爆内存。这时候用 response.contentread(chunk_size) 分块下载:

async def download_large_file(session: aiohttp.ClientSession, url: str, save_path: str) -> None:
    async with session.get(url) as response:
        # 检查状态码是否成功
        response.raise_for_status()
        with open(save_path, "wb") as f:
            # 每次读 1MB
            while chunk := await response.content.read(1024 * 1024):
                f.write(chunk)

6. 进阶实用功能

6.1 控制并发数

如果同时发起几百上千个请求,目标服务器可能会直接封你 IP,或者本地带宽被占满。这时候用 asyncio.Semaphore 限制最大并发数:

# 最大并发 10
sem = asyncio.Semaphore(10)

async def safe_fetch(session: aiohttp.ClientSession, url: str) -> str:
    # 用 async with 自动获取/释放信号量
    async with sem:
        async with session.get(url) as response:
            return await response.text()

6.2 会话持久化

ClientSession 会自动维护 cookie、连接池、默认 headers,所以可以在创建 Session 时统一设置,所有后续请求都能用:

async def persistent_session_demo() -> None:
    # 统一设置默认 cookie、UA、超时
    session = aiohttp.ClientSession(
        cookies={"session_id": "abc123"},
        headers={"User-Agent": "MyCrawler/1.0"},
        timeout=aiohttp.ClientTimeout(total=20)
    )
    try:
        # 两次请求会自动复用 cookie 和连接
        async with session.get("https://example.com/page1") as r1:
            ...
        async with session.get("https://example.com/page2") as r2:
            ...
    finally:
        # 如果不用 async with 创建 Session,记得手动 close
        await session.close()

6.3 代理设置

# HTTP/HTTPS 代理
async with session.get(url, proxy="http://127.0.0.1:7890") as response:
    ...

# 带认证的代理
async with session.get(url, proxy="http://user:pass@proxy.example.com:8080") as response:
    ...

7. 错误处理

网络请求很容易出问题,一定要加异常捕获!常见的异常类在 aiohttp.ClientError 下面:

async def fetch_with_error_handling(session: aiohttp.ClientSession, url: str) -> str | None:
    try:
        async with session.get(url, timeout=10) as response:
            # 自动抛出 HTTP 错误(比如 404、500)
            response.raise_for_status()
            return await response.text()
    except aiohttp.ClientConnectorError:
        print(f"无法连接到服务器: {url}")
    except aiohttp.ClientTimeoutError:
        print(f"请求超时: {url}")
    except aiohttp.HTTPError as e:
        print(f"HTTP 错误: {e.status} - {url}")
    except aiohttp.ClientError as e:
        print(f"其他请求错误: {e} - {url}")
    return None

8. 完整实战:批量爬取网站

把上面的知识点整合起来,写一个批量爬取 3 个网站、限制并发 2、自动处理错误的完整爬虫:

import aiohttp
import asyncio

# 批量爬取的目标
URLS = [
    "https://example.com",
    "https://httpbin.org/status/404",  # 测试 404 错误
    "https://httpbin.org/delay/2",      # 测试 2s 延迟
]

# 最大并发 2
MAX_CONCURRENCY = 2
# 总超时 5s
TIMEOUT = aiohttp.ClientTimeout(total=5)

async def fetch(session: aiohttp.ClientSession, url: str, sem: asyncio.Semaphore) -> tuple[str, int | None]:
    async with sem:
        try:
            async with session.get(url, timeout=TIMEOUT) as response:
                response.raise_for_status()
                content = await response.text()
                return (url, len(content))
        except Exception as e:
            print(f"❌ 处理 {url} 失败: {str(e)}")
            return (url, None)

async def main():
    sem = asyncio.Semaphore(MAX_CONCURRENCY)
    async with aiohttp.ClientSession() as session:
        # 创建所有任务
        tasks = [fetch(session, url, sem) for url in URLS]
        # 并发执行所有任务,不管失败与否都会返回结果
        results = await asyncio.gather(*tasks)
        
        # 打印统计结果
        print("\n📊 爬取结果统计:")
        for url, length in results:
            if length:
                print(f"✅ {url}: 成功获取 {length} 字符")
            else:
                print(f"❌ {url}: 爬取失败")

if __name__ == "__main__":
    asyncio.run(main())

9. 最佳实践总结

  1. 必须复用 ClientSession:不要在每个请求里都新建,新建一次成本很高
  2. 一定要控制并发数:根据目标服务器的承受能力调整,一般建议 5-20
  3. 合理设置超时:避免单个慢请求拖垮整个程序
  4. 分块下载大文件:防止内存溢出
  5. 完整的错误处理:网络请求不可控,所有可能的异常都要覆盖
  6. 手动管理 Session 时记得 close:或者用 async with 自动管理

10. 延伸学习

如果想深入了解 aiohttp,可以看官方文档:

  • aiohttp 官方文档
  • 如果想处理更复杂的异步任务调度,可以结合 asyncioaiomultiprocessing 使用