现代Python协程编程指南
在Python并发编程的世界里,协程一直是一个强大而又略带神秘的工具。从最初的生成器 hack 到如今的 async/await 语法糖,协程的演进历程不仅反映了Python语言的发展,更代表了异步编程范式的普及。今天,我们就来深入探讨现代Python中的协程编程,从基础概念到高级应用,全方位掌握这一利器。
协程基础概念
协程(Coroutine),又称微线程或纤程,是一种比线程更轻量级的并发执行单元。与传统的子程序(函数)调用不同,协程允许在执行过程中暂停并在之后恢复。
协程 vs 子程序
协程的优势
- 极高的执行效率:协程切换由程序控制,没有线程切换的开销
- 无需锁机制:单线程执行避免了多线程的竞争条件
- 高并发能力:单个线程可支持大量协程并发
- 简化异步编程:以同步代码风格实现异步逻辑
Python中的协程演进
1. 基于生成器的协程(Python 2.5+)
在 async/await 出现之前,Python 社区就已经开始探索协程的可能性。通过巧妙地使用生成器的 yield 关键字,我们实现了最早的协程模型:
def consumer():
r = ''
while True:
n = yield r # 暂停执行,等待外部发送数据
if not n:
return
print(f'[CONSUMER] Consuming {n}...')
r = '200 OK'
def producer(c):
c.send(None) # 预激协程,执行到第一个 yield 处
for n in range(1, 6):
print(f'[PRODUCER] Producing {n}...')
r = c.send(n) # 发送数据并恢复协程
print(f'[PRODUCER] Consumer returned: {r}')
c.close() # 关闭协程
# 运行示例
c = consumer()
producer(c)
这种方式虽然实现了协程的基本功能,但可读性较差,且需要手动管理协程状态,使用起来并不方便。
2. 使用asyncio的协程(Python 3.4+)
Python 3.4 引入了 asyncio 库,为协程提供了官方支持。此时我们使用 @asyncio.coroutine 装饰器和 yield from 语法:
import asyncio
@asyncio.coroutine
def consumer(n):
print(f'[CONSUMER] Consuming {n}...')
yield from asyncio.sleep(1)
return '200 OK'
@asyncio.coroutine
def producer():
for n in range(1, 6):
print(f'[PRODUCER] Producing {n}...')
r = yield from consumer(n)
print(f'[PRODUCER] Consumer returned: {r}')
loop = asyncio.get_event_loop()
loop.run_until_complete(producer())
loop.close()
这一版本大大简化了协程的编写,但 yield from 仍然显得有些晦涩。
3. 现代Python协程(Python 3.7+)
Python 3.5 引入了 async/await 语法,3.7 进一步简化了 API,形成了我们今天常用的现代协程模型:
import asyncio
async def task(name, delay):
print(f"{name} started")
await asyncio.sleep(delay) # 挂起当前协程,让事件循环处理其他任务
print(f"{name} completed after {delay}s")
return delay
async def main():
# 并发执行多个协程
results = await asyncio.gather(
task("Task1", 2),
task("Task2", 1),
task("Task3", 3)
)
print(f"All tasks completed with results: {results}")
asyncio.run(main()) # 一行代码启动事件循环
至此,协程编程终于变得直观而优雅。
协程核心概念
1. 事件循环(Event Loop)
协程的执行依赖于事件循环,它就像一个调度员,负责安排协程的执行顺序。当一个协程被 await 挂起时,事件循环会去执行其他准备好的协程。
import asyncio
async def hello_world():
print("Hello")
await asyncio.sleep(1)
print("World")
# Python 3.7 之前的方式
loop = asyncio.get_event_loop()
loop.run_until_complete(hello_world())
loop.close()
# Python 3.7+ 的简化方式
asyncio.run(hello_world())
2. 可等待对象(Awaitables)
在 Python 中,只有可等待对象才能被 await。主要有三种:
- 协程(Coroutines):使用
async def 定义的函数
- 任务(Tasks):由
asyncio.create_task() 封装的协程,会被事件循环立即调度
- 未来对象(Futures):表示异步操作的最终结果
3. 协程与任务
协程对象本身不会自动执行,需要被事件循环调度。我们可以直接 await 它,或者将其封装成任务:
import asyncio
async def nested():
return 42
async def main():
# 直接 await 协程 - 串行执行
print(await nested())
# 创建任务 - 并发执行
task = asyncio.create_task(nested())
print(await task)
asyncio.run(main())
高级协程模式
1. 协程并发执行
使用 asyncio.gather() 可以并发执行多个协程,并等待所有结果返回:
import asyncio
async def fetch_data(delay, id):
print(f"Fetching data {id}...")
await asyncio.sleep(delay)
print(f"Data {id} fetched")
return {"id": id, "delay": delay}
async def main():
tasks = [
asyncio.create_task(fetch_data(2, 1)),
asyncio.create_task(fetch_data(1, 2)),
asyncio.create_task(fetch_data(3, 3))
]
# 等待第一个完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"First task completed: {done.pop().result()}")
# 等待剩余任务完成
await asyncio.wait(pending)
asyncio.run(main())
2. 协程与线程池结合
当遇到无法避免的阻塞 IO 操作时,可以将其放到线程池中执行,避免阻塞事件循环:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
print(f"Start blocking IO at {time.strftime('%X')}")
time.sleep(2) # 模拟阻塞 IO 操作
print(f"Blocking IO done at {time.strftime('%X')}")
return "IO result"
async def main():
print(f"Started main at {time.strftime('%X')}")
# 在默认线程池中运行阻塞 IO
result = await asyncio.get_event_loop().run_in_executor(
None, blocking_io)
print(f"Result: {result}")
print(f"Finished main at {time.strftime('%X')}")
asyncio.run(main())
3. 协程超时控制
使用 asyncio.timeout() 可以为协程设置超时时间,防止任务无限期运行:
import asyncio
async def long_running_task():
try:
print("Task started")
await asyncio.sleep(3600) # 模拟长时间运行的任务
return "Task completed"
except asyncio.CancelledError:
print("Task cancelled")
raise
async def main():
try:
async with asyncio.timeout(1.0):
await long_running_task()
except TimeoutError:
print("Timeout occurred")
asyncio.run(main())
最佳实践
- 避免阻塞操作:在协程中不要使用同步阻塞调用(如
time.sleep()),应使用对应的异步版本(如 asyncio.sleep())。
- 合理使用并发:
asyncio.gather() 适合并行执行独立任务,但要注意任务之间的依赖关系。
- 资源管理:使用
async with 管理异步资源,确保资源正确释放。
- 错误处理:协程中的异常需要在
await 时捕获,否则会被事件循环吞掉。
- 性能监控:使用
asyncio 调试模式检测未等待的协程,避免资源泄漏。
总结
现代Python协程通过 async/await 语法提供了清晰简洁的异步编程模型。相比传统的生成器协程,它具有更直观的语法、更好的错误处理、与异步 IO 库的深度集成,以及更强大的并发控制能力。
正如计算机科学家 Donald Knuth 所言:"子程序就是协程的一种特例。" 掌握协程将帮助你编写出更高效、更易维护的并发程序,特别适合 IO 密集型应用如网络爬虫、API 服务、实时通信等场景。
随着 Python 生态的不断发展,协程的应用场景也会越来越广泛。希望这篇指南能为你打开协程编程的大门,让你的 Python 代码跑得更快、更稳!