Python asyncio 异步编程教程

1. 概述

asyncio 是 Python 3.4 正式纳入标准库的异步 I/O 开发工具,在 Python 3.7 后 API 趋于稳定易用——它用单线程事件循环模拟了“并行”效果,特别适合爬虫、微服务通信、实时聊天等 I/O 密集型场景(数算、加密这种 CPU 密集型更推荐 multiprocessing)。

相较于传统多线程/多进程,asyncio 不需要频繁的上下文切换开销,资源占用更低,能轻松处理数千甚至上万个并发请求。


2. 核心概念快速入门

先把两个最基础的核心概念记牢,后面代码就不会绕:

2.1 事件循环 (Event Loop)

它是 asyncio 的“指挥中心”,负责:

  • 接收协程任务并加入调度队列
  • 当某个任务进入 I/O 等待(比如 await sleepawait 网络请求)时,主动切出去执行其他未阻塞的任务
  • 当等待结束时,把任务切回来继续执行

Python 3.7+ 后,asyncio.run() 会自动创建并销毁顶层事件循环,我们不用手动管理了。

2.2 协程 (Coroutine)

协程是 asyncio最小执行单元,用 async def 定义:

async def my_first_coro():
    print("我是协程")

⚠️ 注意:直接调用协程函数不会执行代码,只会返回一个协程对象,必须通过 awaitasyncio.run() 或事件循环调度才能运行。


3. 基础操作:从写单个协程开始

3.1 运行单个顶层协程

asyncio.run() 是最推荐的 3.7+ 写法:

import asyncio

async def hello():
    print("Hello world!")
    # 模拟耗时1秒的I/O操作(比如读文件、网络请求)
    await asyncio.sleep(1)
    print("Hello again!")

# 调用顶层协程函数
asyncio.run(hello())

运行这段代码会先打印 Hello world!,停顿1秒后打印 Hello again!——和同步代码看起来一样,但如果有多个任务,就能体现差异了。

3.2 熟练使用 async/await

这两个是 asyncio 的语法糖核心:

  • async def:声明一个协程函数(或异步上下文管理器、异步迭代器)
  • await暂停当前协程,把控制权交还给事件循环,等后面的异步操作完成后再恢复 ✅ 只能在 async def 内部使用 awaitawait 后面必须跟协程对象、任务对象、异步 Future 对象(我们暂时只需要前两个)

4. 并发执行:同时跑多个任务才是优势

单个协程和同步没区别,并发才是 asyncio 的用武之地,常用两种方式。

4.1 批量并行:asyncio.gather()

适合“一起提交一批任务,等所有任务完成后统一拿结果”的场景:

import asyncio

async def greet(name: str):
    print(f"👋 Hello {name}!")
    await asyncio.sleep(1)  # 模拟给不同人发消息的延迟
    print(f"👋 Goodbye {name}!")
    return name  # gather 会收集所有协程的返回值

async def main():
    # 批量并行,注意要用 * 解包列表(如果任务列表是动态生成的)
    results = await asyncio.gather(
        greet("Alice"),
        greet("Bob"),
        greet("Charlie")
    )
    print("📋 All greetings done, participants:", results)

asyncio.run(main())

运行这段代码会几乎同时打印3个👋 Hello,停顿1秒后几乎同时打印3个👋 Goodbye——总耗时只有1秒左右,而不是同步的3秒!

4.2 更灵活的调度:asyncio.create_task()

gather() 是批量等待的,而 create_task()立即把协程包装成任务对象加入事件循环,我们可以更灵活地控制任务(比如取消、单独等待、先跑一部分再补任务):

import asyncio

async def slow_task(name: str, delay: int):
    print(f"⏳ Task {name} started, takes {delay}s")
    await asyncio.sleep(delay)
    print(f"✅ Task {name} completed")
    return name

async def main():
    # 立即创建并启动两个任务
    task_a = asyncio.create_task(slow_task("A", 2))
    task_b = asyncio.create_task(slow_task("B", 1))

    # 也可以等所有任务完成
    results = await asyncio.gather(task_a, task_b)
    print(f"🎉 All tasks finished: {results}")

asyncio.run(main())

⚠️ 注意:Python 3.7+ 才用 asyncio.create_task(),旧版本(3.6-)用 loop.create_task(),但现在几乎没人用旧版了。


5. 实用案例:异步网络请求爬虫

网络请求是最典型的 I/O 密集型场景,这里用 asyncio.open_connection() 手写一个简单的 HTTP 爬虫(实际生产更推荐 aiohttp 这种第三方库,但手写能更直观理解异步网络):

import asyncio

async def fetch_raw_page(host: str):
    """异步获取指定域名的首页原始数据"""
    print(f"🔍 Starting to fetch {host}...")
    # 建立异步 TCP 连接
    reader, writer = await asyncio.open_connection(host, 80)

    # 构造简单的 HTTP GET 请求
    request = f"GET / HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"
    writer.write(request.encode("utf-8"))
    # 确保请求完全发送(防止数据缓冲区残留)
    await writer.drain()

    # 异步读取响应
    raw_response = await reader.read()
    # 关闭连接
    writer.close()
    await writer.wait_closed()

    print(f"✅ Finished fetching {host}, got {len(raw_response)} bytes")
    return raw_response

async def main():
    # 测试域名
    test_hosts = ["www.example.com", "www.github.com", "www.python.org"]
    # 批量异步获取
    raw_pages = await asyncio.gather(*[fetch_raw_page(host) for host in test_hosts])
    print(f"📦 Total fetched {len(raw_pages)} pages")

asyncio.run(main())

同样,总耗时只取决于最慢的那个请求,而不是三个请求的总和。


6. 避坑指南与最佳实践

asyncio 代码很容易踩坑,以下几点必须注意:

6.1 绝对不要在协程里写同步阻塞代码!

如果在协程里用 time.sleep()、同步的 requests.get()、同步的文件读写(open() 普通模式),整个事件循环都会被卡住,其他任务无法执行。

✅ 替代方案:

  • 同步睡眠 → asyncio.sleep()
  • 同步网络请求 → aiohttphttpx(异步模式)
  • 同步文件读写 → aiofiles

6.2 用 asyncio.wait_for() 做超时控制

很多 I/O 操作可能会卡死(比如网络波动、服务器不响应),一定要加超时:

import asyncio

async def maybe_stuck_task():
    await asyncio.sleep(10)  # 假装一个可能卡死的任务
    return "Success"

async def main():
    try:
        # 等待最多5秒
        result = await asyncio.wait_for(maybe_stuck_task(), timeout=5)
        print(result)
    except asyncio.TimeoutError:
        print("⏰ Task timed out!")

asyncio.run(main())

6.3 用 async with 管理异步资源

对于支持异步上下文管理的对象(比如 aiohttp.ClientSessionasyncio.open_connection() 的 reader/writer 旧版写法),一定要用 async with 自动释放资源,避免内存泄漏或连接耗尽。

6.4 正确捕获协程和任务的异常

  • asyncio.gather() 默认会把所有任务的异常收集到最后抛出,如果想让某个任务的异常不影响其他任务,可以加参数 return_exceptions=True,把异常当成返回值处理
  • 单个任务的异常可以通过 try/except await task 捕获

7. 高级特性:让异步代码更优雅(选读)

这两个特性可以简化某些复杂的异步场景。

7.1 异步迭代器:async for

普通迭代器用 __iter____next__,异步迭代器用 __aiter__(返回自身)和 __anext__(必须是协程):

import asyncio

class AsyncTimer:
    """异步倒计时器"""
    def __init__(self, start: int):
        self.current = start

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current <= 0:
            raise StopAsyncIteration
        await asyncio.sleep(1)
        self.current -= 1
        return self.current + 1

async def main():
    async for num in AsyncTimer(3):
        print(f"⏲️ Countdown: {num}")

asyncio.run(main())

7.2 异步生成器:更简单的异步迭代

异步迭代器写起来有点啰嗦,用 async def + yield 可以快速生成异步迭代器:

import asyncio

async def async_timer_gen(start: int):
    """异步倒计时生成器(更简洁)"""
    for num in range(start, 0, -1):
        await asyncio.sleep(1)
        yield num

async def main():
    async for num in async_timer_gen(3):
        print(f"⏲️ Countdown: {num}")

asyncio.run(main())

8. 总结

asyncio 是 Python 处理 I/O 密集型并发的首选方案,核心要点如下:

  1. async def 定义协程,用 await 暂停和恢复
  2. asyncio.run() 自动管理顶层事件循环
  3. 批量并行用 asyncio.gather(),灵活调度用 asyncio.create_task()
  4. 绝对避免同步阻塞代码,用对应的异步库替代
  5. 加超时控制和异常处理,用 async with 管理资源

掌握这些内容,就可以开始用 asyncio 写高性能的爬虫、聊天机器人后端、微服务通信组件了!