现代 Python 协程编程指南
在 Python 并发编程的世界中,协程一直是一个强大又略显微妙的存在。从早期生成器实现的“黑科技”,到如今优雅的 async/await 语法糖,协程的进化不仅体现了语言的成长,更标志着异步编程范式的全面普及。这篇文章将带你从基础概念走到高级模式,全方位掌握现代 Python 协程。
协程基础概念
协程(Coroutine),有时候也叫微线程或纤程,是一种比线程更轻量级的并发执行单元。它和普通函数(子程序)最大的不同在于:可以在执行中途主动挂起自己,稍后再由外部代码恢复运行。
协程 vs 子程序
协程的优势
- 轻量切换:协程切换完全由程序控制,没有线程切换的系统开销。
- 无需锁机制:因为只在同一个线程内调度,天然的“单线程”就避免了竞态条件,无需复杂的锁。
- 高并发能力:一个线程就能驱动成千上万个协程同时“在跑”。
- 像写同步代码一样写异步逻辑:
await 让流程暂停,但不会阻塞整个线程,代码可读性大幅提升。
Python 中的协程演进
Python 社区对协程的探索由来已久,一路走来经历了三个主要阶段。
第一阶段:生成器协程(Python 2.5+)
在 async/await 诞生之前,人们利用生成器的 yield 特性巧妙地模拟出了协程行为:
def consumer():
r = ''
while True:
n = yield r # 执行到这里暂停,等待 send() 传入数据
if not n:
return
print(f'[CONSUMER] 正在消费 {n}...')
r = '200 OK'
def producer(c):
c.send(None) # 预激协程,让 consumer 走到第一个 yield
for n in range(1, 6):
print(f'[PRODUCER] 生产 {n}...')
r = c.send(n) # 向协程发送数据,同时恢复它
print(f'[PRODUCER] 收到回复: {r}')
c.close()
c = consumer()
producer(c)
这个阶段的协程虽然能用,但需要手动 send、close,并在多个生成器之间来回传递,代码晦涩且易出错。
第二阶段:asyncio + yield from(Python 3.4+)
Python 3.4 引入了标准库 asyncio,正式把协程纳入“官方体系”。那时我们用 @asyncio.coroutine 装饰器和 yield from 语法:
import asyncio
@asyncio.coroutine
def consumer(n):
print(f'[CONSUMER] 正在消费 {n}...')
yield from asyncio.sleep(1) # 挂起协程,把控制权交给事件循环
return '200 OK'
@asyncio.coroutine
def producer():
for n in range(1, 6):
print(f'[PRODUCER] 生产 {n}...')
r = yield from consumer(n)
print(f'[PRODUCER] 收到回复: {r}')
loop = asyncio.get_event_loop()
loop.run_until_complete(producer())
loop.close()
yield from 简化了子协程的调用,但依旧不够直观。
第三阶段:现代 async/await 协程(Python 3.7+)
Python 3.5 引入了 async/await,3.7 再次简化了事件循环的启动方式,呈现出我们今天熟悉的异步代码风格:
import asyncio
async def task(name, delay):
print(f"{name} 启动")
await asyncio.sleep(delay) # 主动挂起,让事件循环去处理其他任务
print(f"{name} 在 {delay}s 后完成")
return delay
async def main():
# 并发运行多个协程,等待全部结束
results = await asyncio.gather(
task("任务1", 2),
task("任务2", 1),
task("任务3", 3)
)
print(f"全部完成,结果:{results}")
asyncio.run(main()) # 一行代码启动事件循环
至此,协程编程终于变得自然,同步代码的顺序感被完美保留。
协程核心概念
要真正用好现代协程,需要理解它背后的几个关键角色。
1. 事件循环
事件循环(Event Loop)好比一个中央调度器。当一个协程执行到 await 时,它会告诉事件循环:“我需要等一会儿,你先去处理别的协程吧。” 事件循环就会把控制权切换到其他就绪的协程上,等到挂起条件满足再把它恢复。
import asyncio
async def hello_world():
print("Hello")
await asyncio.sleep(1)
print("World")
# Python 3.7 之前的老办法
# loop = asyncio.get_event_loop()
# loop.run_until_complete(hello_world())
# loop.close()
# 现在的推荐方式
asyncio.run(hello_world())
asyncio.run() 会创建一个事件循环,运行传入的协程,并在结束后自动清理,是大多数程序的入口。
2. 可等待对象
在 await 关键字后面只能放可等待对象(Awaitable)。Python 中主要有三类:
- 协程对象:由
async def 函数调用返回的对象,必须被 await 或包装成任务才会执行。
- 任务(Task):通过
asyncio.create_task() 将协程包裹而成的对象。任务一经创建就会立即被事件循环调度执行,你可以随后 await 它获取结果。
- Future:代表一个尚未完成的结果,多用于底层库,普通应用开发中不常直接接触。
3. 协程与任务
协程对象本身不会主动运行,你需要决定是直接等待它,还是把它变为任务。
import asyncio
async def nested():
return 42
async def main():
# 方式一:直接 await 协程 —— 串行执行
result = await nested()
print(result)
# 方式二:创建任务 —— 并发执行
task = asyncio.create_task(nested())
# 此时 nested() 已经在后台“跑”了
print(await task)
asyncio.run(main())
如果只是 await 协程,相当于一路等到它结束才继续往下;而创建任务则相当于“发射后不管”,等需要结果时再去获取,这为并发提供了基础。
高级协程模式
1. 协程并发执行
当有一批互相独立的异步任务时,可以使用 asyncio.gather 让它们同时运行:
import asyncio
async def fetch_data(delay, id):
print(f"开始获取数据 {id}...")
await asyncio.sleep(delay)
print(f"数据 {id} 获取完成")
return {"id": id, "delay": delay}
async def main():
# 同时“发射”三个任务
results = await asyncio.gather(
fetch_data(2, 1),
fetch_data(1, 2),
fetch_data(3, 3)
)
print("所有数据:", results)
asyncio.run(main())
如果希望更灵活地控制任务完成时机,可以用 asyncio.wait,它能返回最先完成的任务:
async def main():
tasks = [
asyncio.create_task(fetch_data(2, 1)),
asyncio.create_task(fetch_data(1, 2)),
asyncio.create_task(fetch_data(3, 3))
]
# 等待第一个任务完成
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
first = done.pop()
print(f"最先完成的任务结果: {first.result()}")
# 继续等待剩下的任务
await asyncio.wait(pending)
2. 协程与线程池结合
协程适合 IO 密集型任务,但如果代码中不得不使用某些同步阻塞的库(比如 time.sleep、文件读写等),直接调用会拖慢整个事件循环。这时就可以把阻塞操作交给线程池:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
print(f"阻塞 IO 开始 {time.strftime('%X')}")
time.sleep(2) # 模拟同步阻塞操作
print(f"阻塞 IO 结束 {time.strftime('%X')}")
return "IO 结果"
async def main():
print(f"主协程开始 {time.strftime('%X')}")
loop = asyncio.get_running_loop()
# 把阻塞函数扔进默认线程池执行,事件循环不受影响
result = await loop.run_in_executor(None, blocking_io)
print("得取结果:", result)
print(f"主协程结束 {time.strftime('%X')}")
asyncio.run(main())
对于 CPU 密集型任务,也可以用类似方法交给进程池,不过更常见的做法是利用 concurrent.futures.ProcessPoolExecutor。
3. 协程超时控制
在实际项目中,网络请求或外部服务可能长时间无响应。我们可以为协程设置超时,防止任务永远卡住。
import asyncio
async def long_running_task():
try:
print("长时间任务启动")
await asyncio.sleep(3600) # 模拟耗时操作
return "任务完成"
except asyncio.CancelledError:
print("任务被取消")
raise
async def main():
try:
# 设置 1 秒超时,超时后会取消任务并抛出 TimeoutError
await asyncio.wait_for(long_running_task(), timeout=1.0)
except asyncio.TimeoutError:
print("超时!")
asyncio.run(main())
提示:Python 3.11 之后还可以使用更优雅的 async with asyncio.timeout(1.0): 上下文管理器,效果类似。
最佳实践
在日常开发中掌握以下几项原则,可以帮你写出稳定高效的协程代码:
-
永远不要在协程里调用同步阻塞函数
想暂停就使用 asyncio.sleep(),遇到同步 IO 就扔给线程池。
-
合理选择并发工具
asyncio.gather 适合“全都要”,asyncio.wait 适合“只需一个”或精细控制完成顺序。
-
用 async with 管理异步资源
数据库连接、文件等资源应使用异步上下文管理器,确保关闭干净。
-
不要忘记捕获异常
如果一个协程抛出异常却没有任何地方 await 它,异常会被事件循环“吞掉”,导致难以发现的 bug。务必在合适的地方 try/except。
-
启用调试模式检测遗漏
开发时可以通过 asyncio.run(coro, debug=True) 或者设置环境变量 PYTHONASYNCIODEBUG=1,让 asyncio 警告那些创建后未被等待的协程。
总结
现代 Python 协程通过 async/await 为我们提供了一种清晰、直观的异步编程模型。与古老的生成器协程相比,它带来的不仅是简单的语法,更是完整的并发控制能力以及与异步 IO 生态的深度集成。
正如计算机科学家 Donald Knuth 所说:“子程序只是协程的一种特例。” 掌握协程思维,你的代码就能更从容地应对高并发 IO 场景——从网络爬虫、API 服务到实时通信,都能跑得又快又稳。
希望这篇指南能帮你打开协程编程的大门,让你的 Python 异步代码如虎添翼。