Python 异步爬虫教程:aiohttp 详解
当你用 requests 同步爬取 10 个响应延迟 1s 的网站时,总耗时至少是 10s——所有请求都在排队等待上一个完成。如果想把效率提上去?今天的主角 aiohttp 就能帮你解决这个问题。
1. 概述
aiohttp 是 Python 官方异步库 asyncio 生态中的核心 HTTP 工具,同时提供客户端和服务器端功能。在爬虫开发中,我们主要用它的客户端部分发起高并发、无阻塞的请求。
1.1 为什么选它?
- 🚀 完全异步无阻塞:多个请求可以并行处理,10个1s延迟的网站总耗时可能只有1.1s
- 📡 支持HTTP/HTTPS/WebSocket全链路:既可以做普通爬虫,也能处理实时数据流
- 🤝 连接池/会话/代理/Cookie 全套配齐:不用自己造轮子管理复杂的请求环境
- ⚡ 性能远超同步库 requests:在大规模爬取场景下优势极其明显
2. 基础入门
2.1 安装
直接用 pip 就能安装,注意 Python 版本要 ≥3.7(旧版兼容 3.6,但建议升级):
2.2 第一个异步爬虫
写异步代码前,先记住两个核心关键字:
async def:定义异步函数,不能直接调用,要配合事件循环
await:等待异步操作完成(比如发送请求、读取响应),只能放在 async def 函数里
下面是最简的例子,用 aiohttp 爬取 example.com 并打印前 200 个字符:
import aiohttp
import asyncio
async def fetch(session: aiohttp.ClientSession, url: str) -> str:
# 用 async with 确保请求响应后自动释放连接
async with session.get(url) as response:
# 响应文本需要 await 读取,不是同步赋值
return await response.text()
async def main():
# 用 async with 创建并自动关闭 ClientSession
# Session 可以复用 cookie、headers、连接池,不要每次请求都新建!
async with aiohttp.ClientSession() as session:
html = await fetch(session, "https://example.com")
print(html[:200])
# Python 3.7+ 推荐用 asyncio.run() 一键启动事件循环
if __name__ == "__main__":
asyncio.run(main())
3. 常见请求配置
3.1 URL 带参数
用 params 传字典,会自动拼接到 URL 后面,避免手动转义特殊字符:
async def search_baidu(session: aiohttp.ClientSession, keyword: str) -> str:
params = {"wd": keyword, "ie": "utf-8"}
async with session.get("https://www.baidu.com/s", params=params) as response:
return await response.text()
3.2 设置请求头
修改 User-Agent 伪装成浏览器是爬虫的基本操作,直接传 headers 字典即可:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
async with session.get(url, headers=headers) as response:
...
3.3 超时控制
防止响应慢或卡住的请求拖慢整个程序,用 aiohttp.ClientTimeout 可以设置总超时或各阶段超时(连接、读取等):
# 总超时 10s,连接阶段最多 2s,读取每块最多 3s
timeout = aiohttp.ClientTimeout(total=10, connect=2, sock_read=3)
async with session.get(url, timeout=timeout) as response:
...
4. 支持的请求方法
除了最常用的 GET,aiohttp 也覆盖了 POST/PUT/DELETE 等所有 RESTful 方法。
4.1 POST 请求
表单提交(对应 requests 的 data)
# 比如模拟登录,传用户名密码
form_data = {"username": "test", "password": "123456"}
async with session.post("https://example.com/login", data=form_data) as response:
...
JSON 提交(对应 requests 的 json)
# 调用接口传 JSON 数据
json_payload = {"name": "Alice", "age": 25, "city": "Beijing"}
async with session.post("https://example.com/api/users", json=json_payload) as response:
...
4.2 PUT/DELETE 等
用法和 GET/POST 基本一致,直接替换方法名即可:
# PUT 更新资源
async with session.put("https://example.com/api/users/1", json={"age": 26}) as response:
...
# DELETE 删除资源
async with session.delete("https://example.com/api/users/1") as response:
...
5. 处理响应数据
5.1 基础响应信息
响应状态码、响应头可以直接获取,不需要 await;但内容必须 await 读取,因为是异步流。
async with session.get(url) as response:
print(f"状态码: {response.status}") # 比如 200
print(f"响应头: {response.headers}") # dict 格式
print(f"Content-Type: {response.headers.get('Content-Type')}")
# 文本内容
text = await response.text()
# 二进制内容(比如下载图片、PDF)
bytes_content = await response.read()
# JSON 内容(自动解析)
json_data = await response.json()
5.2 大文件流式下载
如果要下载几百 MB 的文件,直接用 read() 会把整个文件加载到内存里,容易爆内存。这时候用 response.content 的 read(chunk_size) 分块下载:
async def download_large_file(session: aiohttp.ClientSession, url: str, save_path: str) -> None:
async with session.get(url) as response:
# 检查状态码是否成功
response.raise_for_status()
with open(save_path, "wb") as f:
# 每次读 1MB
while chunk := await response.content.read(1024 * 1024):
f.write(chunk)
6. 进阶实用功能
6.1 控制并发数
如果同时发起几百上千个请求,目标服务器可能会直接封你 IP,或者本地带宽被占满。这时候用 asyncio.Semaphore 限制最大并发数:
# 最大并发 10
sem = asyncio.Semaphore(10)
async def safe_fetch(session: aiohttp.ClientSession, url: str) -> str:
# 用 async with 自动获取/释放信号量
async with sem:
async with session.get(url) as response:
return await response.text()
6.2 会话持久化
ClientSession 会自动维护 cookie、连接池、默认 headers,所以可以在创建 Session 时统一设置,所有后续请求都能用:
async def persistent_session_demo() -> None:
# 统一设置默认 cookie、UA、超时
session = aiohttp.ClientSession(
cookies={"session_id": "abc123"},
headers={"User-Agent": "MyCrawler/1.0"},
timeout=aiohttp.ClientTimeout(total=20)
)
try:
# 两次请求会自动复用 cookie 和连接
async with session.get("https://example.com/page1") as r1:
...
async with session.get("https://example.com/page2") as r2:
...
finally:
# 如果不用 async with 创建 Session,记得手动 close
await session.close()
6.3 代理设置
# HTTP/HTTPS 代理
async with session.get(url, proxy="http://127.0.0.1:7890") as response:
...
# 带认证的代理
async with session.get(url, proxy="http://user:pass@proxy.example.com:8080") as response:
...
7. 错误处理
网络请求很容易出问题,一定要加异常捕获!常见的异常类在 aiohttp.ClientError 下面:
async def fetch_with_error_handling(session: aiohttp.ClientSession, url: str) -> str | None:
try:
async with session.get(url, timeout=10) as response:
# 自动抛出 HTTP 错误(比如 404、500)
response.raise_for_status()
return await response.text()
except aiohttp.ClientConnectorError:
print(f"无法连接到服务器: {url}")
except aiohttp.ClientTimeoutError:
print(f"请求超时: {url}")
except aiohttp.HTTPError as e:
print(f"HTTP 错误: {e.status} - {url}")
except aiohttp.ClientError as e:
print(f"其他请求错误: {e} - {url}")
return None
8. 完整实战:批量爬取网站
把上面的知识点整合起来,写一个批量爬取 3 个网站、限制并发 2、自动处理错误的完整爬虫:
import aiohttp
import asyncio
# 批量爬取的目标
URLS = [
"https://example.com",
"https://httpbin.org/status/404", # 测试 404 错误
"https://httpbin.org/delay/2", # 测试 2s 延迟
]
# 最大并发 2
MAX_CONCURRENCY = 2
# 总超时 5s
TIMEOUT = aiohttp.ClientTimeout(total=5)
async def fetch(session: aiohttp.ClientSession, url: str, sem: asyncio.Semaphore) -> tuple[str, int | None]:
async with sem:
try:
async with session.get(url, timeout=TIMEOUT) as response:
response.raise_for_status()
content = await response.text()
return (url, len(content))
except Exception as e:
print(f"❌ 处理 {url} 失败: {str(e)}")
return (url, None)
async def main():
sem = asyncio.Semaphore(MAX_CONCURRENCY)
async with aiohttp.ClientSession() as session:
# 创建所有任务
tasks = [fetch(session, url, sem) for url in URLS]
# 并发执行所有任务,不管失败与否都会返回结果
results = await asyncio.gather(*tasks)
# 打印统计结果
print("\n📊 爬取结果统计:")
for url, length in results:
if length:
print(f"✅ {url}: 成功获取 {length} 字符")
else:
print(f"❌ {url}: 爬取失败")
if __name__ == "__main__":
asyncio.run(main())
9. 最佳实践总结
- 必须复用 ClientSession:不要在每个请求里都新建,新建一次成本很高
- 一定要控制并发数:根据目标服务器的承受能力调整,一般建议 5-20
- 合理设置超时:避免单个慢请求拖垮整个程序
- 分块下载大文件:防止内存溢出
- 完整的错误处理:网络请求不可控,所有可能的异常都要覆盖
- 手动管理 Session 时记得 close:或者用
async with 自动管理
10. 延伸学习
如果想深入了解 aiohttp,可以看官方文档:
- aiohttp 官方文档
- 如果想处理更复杂的异步任务调度,可以结合
asyncio 和 aiomultiprocessing 使用