现代Python协程编程指南

在Python并发编程的世界里,协程一直是一个强大而又略带神秘的工具。从最初的生成器 hack 到如今的 async/await 语法糖,协程的演进历程不仅反映了Python语言的发展,更代表了异步编程范式的普及。今天,我们就来深入探讨现代Python中的协程编程,从基础概念到高级应用,全方位掌握这一利器。

协程基础概念

协程(Coroutine),又称微线程或纤程,是一种比线程更轻量级的并发执行单元。与传统的子程序(函数)调用不同,协程允许在执行过程中暂停并在之后恢复。

协程 vs 子程序

特性子程序(函数)协程
执行流程严格的层级调用,一次入口一次返回可中断和恢复,多个入口点
调用方式直接调用通过yield/send协作
执行上下文每次调用独立栈帧保持执行状态
并发模型同步阻塞异步非阻塞

协程的优势

  1. 极高的执行效率:协程切换由程序控制,没有线程切换的开销
  2. 无需锁机制:单线程执行避免了多线程的竞争条件
  3. 高并发能力:单个线程可支持大量协程并发
  4. 简化异步编程:以同步代码风格实现异步逻辑

Python中的协程演进

1. 基于生成器的协程(Python 2.5+)

async/await 出现之前,Python 社区就已经开始探索协程的可能性。通过巧妙地使用生成器的 yield 关键字,我们实现了最早的协程模型:

def consumer():
    r = ''
    while True:
        n = yield r  # 暂停执行,等待外部发送数据
        if not n:
            return
        print(f'[CONSUMER] Consuming {n}...')
        r = '200 OK'

def producer(c):
    c.send(None)  # 预激协程,执行到第一个 yield 处
    for n in range(1, 6):
        print(f'[PRODUCER] Producing {n}...')
        r = c.send(n)  # 发送数据并恢复协程
        print(f'[PRODUCER] Consumer returned: {r}')
    c.close()  # 关闭协程

# 运行示例
c = consumer()
producer(c)

这种方式虽然实现了协程的基本功能,但可读性较差,且需要手动管理协程状态,使用起来并不方便。

2. 使用asyncio的协程(Python 3.4+)

Python 3.4 引入了 asyncio 库,为协程提供了官方支持。此时我们使用 @asyncio.coroutine 装饰器和 yield from 语法:

import asyncio

@asyncio.coroutine
def consumer(n):
    print(f'[CONSUMER] Consuming {n}...')
    yield from asyncio.sleep(1)
    return '200 OK'

@asyncio.coroutine
def producer():
    for n in range(1, 6):
        print(f'[PRODUCER] Producing {n}...')
        r = yield from consumer(n)
        print(f'[PRODUCER] Consumer returned: {r}')

loop = asyncio.get_event_loop()
loop.run_until_complete(producer())
loop.close()

这一版本大大简化了协程的编写,但 yield from 仍然显得有些晦涩。

3. 现代Python协程(Python 3.7+)

Python 3.5 引入了 async/await 语法,3.7 进一步简化了 API,形成了我们今天常用的现代协程模型:

import asyncio

async def task(name, delay):
    print(f"{name} started")
    await asyncio.sleep(delay)  # 挂起当前协程,让事件循环处理其他任务
    print(f"{name} completed after {delay}s")
    return delay

async def main():
    # 并发执行多个协程
    results = await asyncio.gather(
        task("Task1", 2),
        task("Task2", 1),
        task("Task3", 3)
    )
    print(f"All tasks completed with results: {results}")

asyncio.run(main())  # 一行代码启动事件循环

至此,协程编程终于变得直观而优雅。

协程核心概念

1. 事件循环(Event Loop)

协程的执行依赖于事件循环,它就像一个调度员,负责安排协程的执行顺序。当一个协程被 await 挂起时,事件循环会去执行其他准备好的协程。

import asyncio

async def hello_world():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# Python 3.7 之前的方式
loop = asyncio.get_event_loop()
loop.run_until_complete(hello_world())
loop.close()

# Python 3.7+ 的简化方式
asyncio.run(hello_world())

2. 可等待对象(Awaitables)

在 Python 中,只有可等待对象才能被 await。主要有三种:

  • 协程(Coroutines):使用 async def 定义的函数
  • 任务(Tasks):由 asyncio.create_task() 封装的协程,会被事件循环立即调度
  • 未来对象(Futures):表示异步操作的最终结果

3. 协程与任务

协程对象本身不会自动执行,需要被事件循环调度。我们可以直接 await 它,或者将其封装成任务:

import asyncio

async def nested():
    return 42

async def main():
    # 直接 await 协程 - 串行执行
    print(await nested())  
    
    # 创建任务 - 并发执行
    task = asyncio.create_task(nested())
    print(await task)  

asyncio.run(main())

高级协程模式

1. 协程并发执行

使用 asyncio.gather() 可以并发执行多个协程,并等待所有结果返回:

import asyncio

async def fetch_data(delay, id):
    print(f"Fetching data {id}...")
    await asyncio.sleep(delay)
    print(f"Data {id} fetched")
    return {"id": id, "delay": delay}

async def main():
    tasks = [
        asyncio.create_task(fetch_data(2, 1)),
        asyncio.create_task(fetch_data(1, 2)),
        asyncio.create_task(fetch_data(3, 3))
    ]
    
    # 等待第一个完成
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print(f"First task completed: {done.pop().result()}")
    
    # 等待剩余任务完成
    await asyncio.wait(pending)

asyncio.run(main())

2. 协程与线程池结合

当遇到无法避免的阻塞 IO 操作时,可以将其放到线程池中执行,避免阻塞事件循环:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def blocking_io():
    print(f"Start blocking IO at {time.strftime('%X')}")
    time.sleep(2)  # 模拟阻塞 IO 操作
    print(f"Blocking IO done at {time.strftime('%X')}")
    return "IO result"

async def main():
    print(f"Started main at {time.strftime('%X')}")
    
    # 在默认线程池中运行阻塞 IO
    result = await asyncio.get_event_loop().run_in_executor(
        None, blocking_io)
    print(f"Result: {result}")
    
    print(f"Finished main at {time.strftime('%X')}")

asyncio.run(main())

3. 协程超时控制

使用 asyncio.timeout() 可以为协程设置超时时间,防止任务无限期运行:

import asyncio

async def long_running_task():
    try:
        print("Task started")
        await asyncio.sleep(3600)  # 模拟长时间运行的任务
        return "Task completed"
    except asyncio.CancelledError:
        print("Task cancelled")
        raise

async def main():
    try:
        async with asyncio.timeout(1.0):
            await long_running_task()
    except TimeoutError:
        print("Timeout occurred")

asyncio.run(main())

最佳实践

  1. 避免阻塞操作:在协程中不要使用同步阻塞调用(如 time.sleep()),应使用对应的异步版本(如 asyncio.sleep())。
  2. 合理使用并发asyncio.gather() 适合并行执行独立任务,但要注意任务之间的依赖关系。
  3. 资源管理:使用 async with 管理异步资源,确保资源正确释放。
  4. 错误处理:协程中的异常需要在 await 时捕获,否则会被事件循环吞掉。
  5. 性能监控:使用 asyncio 调试模式检测未等待的协程,避免资源泄漏。

总结

现代Python协程通过 async/await 语法提供了清晰简洁的异步编程模型。相比传统的生成器协程,它具有更直观的语法、更好的错误处理、与异步 IO 库的深度集成,以及更强大的并发控制能力。

正如计算机科学家 Donald Knuth 所言:"子程序就是协程的一种特例。" 掌握协程将帮助你编写出更高效、更易维护的并发程序,特别适合 IO 密集型应用如网络爬虫、API 服务、实时通信等场景。

随着 Python 生态的不断发展,协程的应用场景也会越来越广泛。希望这篇指南能为你打开协程编程的大门,让你的 Python 代码跑得更快、更稳!