Python 异步爬虫教程:aiohttp 详解
试想你要从 10 个网站抓取数据,每个网站响应恰好要 1 秒钟。如果用最熟悉的 requests 库,你会发现程序至少需要 10 秒——请求一个接一个地“排队”,后面的只能干等。而今天的主角 aiohttp,可以让你同时发起所有请求,总耗时可能还不到 1.1 秒。这就是异步爬虫的魅力。
1. 概述
aiohttp 是 Python 异步库 asyncio 生态中的核心 HTTP 工具,同时提供客户端和服务器端的能力。在爬虫开发中,我们主要用到它的客户端部分,用来发起高并发、非阻塞的 HTTP 请求。
简单来说,它让你的爬虫像一个高效的餐厅服务员:不等上一道菜上桌,就跑去招呼下一桌客人。
1.1 为什么选它?
- 🚀 完全异步无阻塞:多个请求并行处理,10 个 1 秒延迟的网站总耗时可能仅需 1 秒多
- 📡 支持 HTTP/HTTPS/WebSocket:无论是普通爬虫还是实时数据流,都可以应对
- 🤝 内置连接池、会话、代理、Cookie 管理:不用自己再造轮子
- ⚡ 远超同步库
requests 的性能:在大规模采集场景下优势极为明显
2. 基础入门
2.1 安装
一行命令即可安装,注意 Python 版本应 ≥ 3.7(低版本兼容 3.6,但强烈建议升级):
2.2 第一个异步爬虫
编写异步代码前,先记住两个核心关键字:
async def:定义一个异步函数,它不能直接调用,需要交给事件循环去调度
await:等待一个异步操作完成(比如发送请求、读取响应数据),只能出现在 async def 函数内部
下面就用它们来抓取 example.com,并打印前 200 个字符:
import aiohttp
import asyncio
async def fetch(session: aiohttp.ClientSession, url: str) -> str:
# 用 async with 发起请求,确保响应结束后自动释放连接
async with session.get(url) as response:
# 读取响应文本也需要 await,因为数据是异步传输的
return await response.text()
async def main():
# 使用 async with 创建并自动关闭 ClientSession
# Session 内部会复用 Cookie、请求头、连接池,不要为每个请求都新建一个!
async with aiohttp.ClientSession() as session:
html = await fetch(session, "https://example.com")
print(html[:200])
# Python 3.7+ 推荐直接用 asyncio.run() 启动事件循环
if __name__ == "__main__":
asyncio.run(main())
💡 要点: ClientSession 类似于浏览器的“无痕窗口”,在里面打开的所有页面都会共享环境,并且用完关上即可,不要反复创建。
3. 常见请求配置
3.1 URL 中带参数
避免手动拼接和转义,直接将参数以字典形式传给 params,aiohttp 会自动处理:
async def search_baidu(session: aiohttp.ClientSession, keyword: str) -> str:
params = {"wd": keyword, "ie": "utf-8"}
async with session.get("https://www.baidu.com/s", params=params) as response:
return await response.text()
3.2 设置请求头
爬虫最常见的伪装就是修改 User-Agent。直接把需要定制的头部放在 headers 字典里即可:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
async with session.get(url, headers=headers) as response:
...
3.3 超时控制
一个慢请求可能拖住整个程序。aiohttp.ClientTimeout 可以精细控制总超时以及连接、读取等各阶段的时间:
# 总超时 10s,连接阶段最多 2s,逐块读取最多 3s
timeout = aiohttp.ClientTimeout(total=10, connect=2, sock_read=3)
async with session.get(url, timeout=timeout) as response:
...
4. 支持的请求方法
除了最常用的 GET,aiohttp 同样支持 POST、PUT、DELETE 等所有标准 HTTP 方法。
4.1 POST 请求
表单提交(对应 requests 中的 data)
常用于模拟登录或普通表单:
form_data = {"username": "test", "password": "123456"}
async with session.post("https://example.com/login", data=form_data) as response:
...
JSON 提交(对应 requests 中的 json)
与 RESTful API 交互时使用:
json_payload = {"name": "Alice", "age": 25, "city": "Beijing"}
async with session.post("https://example.com/api/users", json=json_payload) as response:
...
4.2 PUT / DELETE 等
用法和 GET、POST 几乎完全一致,根据接口需要替换方法名就行:
# PUT 更新资源
async with session.put("https://example.com/api/users/1", json={"age": 26}) as response:
...
# DELETE 删除资源
async with session.delete("https://example.com/api/users/1") as response:
...
5. 处理响应数据
5.1 基础响应信息
状态码、响应头等属性可以直接获取,但响应体必须用 await 读取,因为它是异步数据流。
async with session.get(url) as response:
print(f"状态码: {response.status}") # 例如 200
print(f"响应头: {response.headers}") # dict 类型
print(f"Content-Type: {response.headers.get('Content-Type')}")
# 三种最常用的读取方式
text = await response.text() # 文本内容
bytes_data = await response.read() # 二进制内容(适合图片、PDF)
json_data = await response.json() # 自动解析 JSON
5.2 大文件流式下载
如果要下载几百 MB 的文件,直接调用 read() 会把整个文件搬进内存,很可能导致程序崩溃。正确的做法是分块读取:
async def download_large_file(session: aiohttp.ClientSession, url: str, save_path: str) -> None:
async with session.get(url) as response:
response.raise_for_status() # 检查状态码,不是 2xx 就抛异常
with open(save_path, "wb") as f:
# 每次读取 1 MB,避免一次性占用大量内存
while chunk := await response.content.read(1024 * 1024):
f.write(chunk)
6. 进阶实用功能
6.1 控制并发数
无限制地开启成百上千个并发,不仅可能被服务器封 IP,还会占满本机带宽。asyncio.Semaphore 可以帮助你限制同时进行的请求数量:
# 最多允许 10 个请求同时运行
sem = asyncio.Semaphore(10)
async def safe_fetch(session: aiohttp.ClientSession, url: str) -> str:
# 用 async with 自动获取和释放信号量
async with sem:
async with session.get(url) as response:
return await response.text()
6.2 会话持久化
ClientSession 内部会维护 Cookie、连接池和默认请求头。在创建时统一配置,后续所有请求都会自动使用,非常方便:
async def persistent_session_demo() -> None:
# 统一设置默认 cookie、UA 和超时
session = aiohttp.ClientSession(
cookies={"session_id": "abc123"},
headers={"User-Agent": "MyCrawler/1.0"},
timeout=aiohttp.ClientTimeout(total=20)
)
try:
# 两次请求会自动复用 cookie 和已有连接
async with session.get("https://example.com/page1") as r1:
...
async with session.get("https://example.com/page2") as r2:
...
finally:
# 如果没用 async with 创建 session,必须手动关闭
await session.close()
6.3 代理设置
直接通过 proxy 参数配置 HTTP/HTTPS 代理,也支持带认证的代理:
# 普通代理
async with session.get(url, proxy="http://127.0.0.1:7890") as response:
...
# 带用户名密码的代理
async with session.get(url, proxy="http://user:pass@proxy.example.com:8080") as response:
...
7. 错误处理
网络请求异常非常频繁,一定要主动捕获!aiohttp 常见的异常都继承自 aiohttp.ClientError,建议针对不同类型分别处理:
async def fetch_with_error_handling(session: aiohttp.ClientSession, url: str) -> str | None:
try:
async with session.get(url, timeout=10) as response:
response.raise_for_status() # 自动抛出 4xx/5xx 错误
return await response.text()
except aiohttp.ClientConnectorError:
print(f"无法连接到服务器: {url}")
except aiohttp.ClientTimeoutError:
print(f"请求超时: {url}")
except aiohttp.HTTPError as e:
print(f"HTTP 错误: {e.status} - {url}")
except aiohttp.ClientError as e:
print(f"其他请求错误: {e} - {url}")
return None
8. 完整实战:批量爬取网站
现在把上面学到的知识点串起来,写一个真实的爬虫:同时抓取 3 个网站,并发上限设为 2,并带上完整的错误处理。
import aiohttp
import asyncio
# 批量爬取的目标 URL
URLS = [
"https://example.com",
"https://httpbin.org/status/404", # 故意测试 404 错误
"https://httpbin.org/delay/2", # 模拟 2 秒延迟
]
MAX_CONCURRENCY = 2 # 最大并发数
TIMEOUT = aiohttp.ClientTimeout(total=5) # 单次请求总超时 5 秒
async def fetch(session: aiohttp.ClientSession, url: str, sem: asyncio.Semaphore) -> tuple[str, int | None]:
async with sem:
try:
async with session.get(url, timeout=TIMEOUT) as response:
response.raise_for_status()
content = await response.text()
return (url, len(content))
except Exception as e:
print(f"❌ 处理 {url} 失败: {str(e)}")
return (url, None)
async def main():
sem = asyncio.Semaphore(MAX_CONCURRENCY)
async with aiohttp.ClientSession() as session:
# 创建所有任务
tasks = [fetch(session, url, sem) for url in URLS]
# 并发执行,无论成败都会返回结果
results = await asyncio.gather(*tasks)
# 统计结果
print("\n📊 爬取结果统计:")
for url, length in results:
if length:
print(f"✅ {url}: 成功获取 {length} 字符")
else:
print(f"❌ {url}: 爬取失败")
if __name__ == "__main__":
asyncio.run(main())
运行这段代码,你会发现尽管有一个 2 秒延迟的页面,另两个页面的结果几乎同时返回,总耗时远小于三个页面各自耗时的累加。
9. 最佳实践总结
- 必须复用
ClientSession:不要为每个请求都新建,初始化的开销比较大
- 务必控制并发数:根据服务器承受能力和自身网络状况灵活调整,一般建议 5~20
- 合理设置超时:避免被个别慢请求拖累整个爬虫
- 大文件分块下载:防止内存溢出
- 完整的错误处理:网络环境不可控,所有可能的异常都要覆盖
- 善用
async with 管理生命周期:自动关闭 Session 和信号量,不易出错
10. 延伸学习
如果想更深入地掌握 aiohttp,建议阅读它的官方文档,并结合 asyncio 的学习,彻底搞懂事件循环和异步编程模型:
- aiohttp 官方文档
- 处理更复杂的并发任务调度时,还可以搭配
aiomultiprocessing 等库,进一步提升大批量任务的执行效率
希望这篇教程能帮你快速上手 aiohttp,写出高效稳定的异步爬虫!