Python asyncio 异步编程教程

1. 概述

asyncio 是 Python 标准库中用于编写异步 I/O 程序的利器,早在 Python 3.4 就被纳入标准库,从 Python 3.7 开始 API 稳定且易用。它通过单线程事件循环模拟出“并行”的效果,非常适合爬虫、微服务通信、实时聊天系统这类 I/O 密集型场景。如果你的任务主要是数值计算、加密解密等 CPU 密集型操作,应该优先考虑 multiprocessing,但在高并发 I/O 场景下,asyncio 的优势无可比拟。

相比传统的多线程/多进程方案,asyncio 不需要频繁切换上下文,资源占用更低,一个进程就能轻松管理成千上万个并发连接。


2. 核心概念:事件循环与协程

理解这两个概念,后面所有代码都豁然开朗。

2.1 事件循环

事件循环是 asyncio 的总调度中心,负责三件事:

  • 接收协程任务,把它们放入调度队列;
  • 当某个任务进入 I/O 等待(例如 await asyncio.sleep(1)await 网络请求)时,主动把控制权切给其他未阻塞的任务继续执行;
  • 当等待结束时,再把之前的任务切回来接着跑。

在 Python 3.7 及之后,我们不再需要手动创建和管理事件循环,asyncio.run() 会帮我们自动创建并销毁顶层事件循环。

2.2 协程

协程是 asyncio最小执行单元,使用 async def 定义:

async def my_first_coro():
    print("我是协程")

注意:直接调用 my_first_coro() 并不会执行内部代码,它只是返回一个协程对象。想让协程真正运行起来,必须通过 awaitasyncio.run() 或事件循环去调度它。


3. 基础操作:从单个协程开始

3.1 运行顶层协程

最简单、最推荐的写法是使用 asyncio.run()

import asyncio

async def hello():
    print("Hello world!")
    # 模拟一个耗时 1 秒的 I/O 操作(例如读文件、网络请求)
    await asyncio.sleep(1)
    print("Hello again!")

# 启动顶层协程
asyncio.run(hello())

运行这段代码,你会先看到 Hello world!,停顿 1 秒后再看到 Hello again!——看起来和同步代码一样,但如果同时跑多个任务,并发优势就立刻显现了。

3.2 async/await 语法糖

  • async def:用来声明一个协程函数,也可以定义异步上下文管理器、异步迭代器等;
  • await暂停当前协程,把控制权交还给事件循环,等后面的异步操作完成后才恢复执行。

使用 await 有两个硬性规则:

  • ✅ 只能在 async def 函数内部使用;
  • await 后面必须是协程对象、Task 对象或异步 Future 对象(初学阶段只需关注前两种)。

4. 并发执行:一次运行多个任务

单个协程和同步写法没区别,并发能力才是 asyncio 的价值所在。常见的并发执行方式有两种。

4.1 批量并行:asyncio.gather()

当你想“一起提交一批任务,等所有任务都完成后统一获取结果”时,使用 gather() 最方便:

import asyncio

async def greet(name: str):
    print(f"👋 Hello {name}!")
    await asyncio.sleep(1)   # 模拟给不同人发消息的延迟
    print(f"👋 Goodbye {name}!")
    return name

async def main():
    # gather 会收集所有协程的返回值
    results = await asyncio.gather(
        greet("Alice"),
        greet("Bob"),
        greet("Charlie")
    )
    print("📋 所有问候完成,参与者:", results)

asyncio.run(main())

执行这段代码,你会看到 3 个 Hello 几乎同时出现,停顿 1 秒后 3 个 Goodbye 又几乎同时出现——总耗时约 1 秒,而同步方式需要 3 秒。

4.2 灵活调度:asyncio.create_task()

gather() 是“等所有任务完成”的批量等待,而 create_task() 可以立即将协程包装成 Task 对象并加入事件循环,然后你可以更灵活地取消任务、单独等待、或者中途再加任务。

import asyncio

async def slow_task(name: str, delay: int):
    print(f"⏳ 任务 {name} 启动,预计 {delay}s 完成")
    await asyncio.sleep(delay)
    print(f"✅ 任务 {name} 完成")
    return name

async def main():
    # 立即创建并启动两个任务
    task_a = asyncio.create_task(slow_task("A", 2))
    task_b = asyncio.create_task(slow_task("B", 1))

    # 等待所有任务完成
    results = await asyncio.gather(task_a, task_b)
    print(f"🎉 所有任务完成:{results}")

asyncio.run(main())

说明:Python 3.7+ 推荐使用 asyncio.create_task(),旧版使用 loop.create_task(),目前极少再遇到过时的写法。


5. 实战案例:异步网络爬虫

网络请求是最典型的 I/O 密集型场景。下面我们用 asyncio.open_connection() 手写一个简单的 HTTP 爬虫,帮助你直观理解异步网络交互。实际项目中更推荐使用 aiohttp 等第三方库,但这里手写能让你看清底层机制。

import asyncio

async def fetch_raw_page(host: str):
    """异步获取指定域名的首页原始数据"""
    print(f"🔍 开始获取 {host} ...")
    # 建立异步 TCP 连接
    reader, writer = await asyncio.open_connection(host, 80)

    # 构造 HTTP GET 请求
    request = f"GET / HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"
    writer.write(request.encode("utf-8"))
    # 确保请求完全发送
    await writer.drain()

    # 异步读取响应内容
    raw_response = await reader.read()
    # 关闭连接
    writer.close()
    await writer.wait_closed()

    print(f"✅ 获取 {host} 完成,接收 {len(raw_response)} 字节")
    return raw_response

async def main():
    hosts = ["www.example.com", "www.github.com", "www.python.org"]
    # 批量异步获取多个页面
    raw_pages = await asyncio.gather(*[fetch_raw_page(host) for host in hosts])
    print(f"📦 总共获取了 {len(raw_pages)} 个页面")

asyncio.run(main())

同样,总耗时只取决于最慢的那个请求,而不是所有请求时间之和。


6. 避坑指南与最佳实践

asyncio 时最容易犯的几个错误,务必提前避开:

6.1 协程中禁止使用同步阻塞代码

如果在协程里调用 time.sleep()、同步的 requests.get() 或普通文件读写 (open()),整个事件循环都会被卡死,其它任务全部阻塞。

✅ 正确的替代方案:

  • 同步 sleepawait asyncio.sleep()
  • 同步网络请求 → aiohttphttpx(异步模式)
  • 同步文件读写 → aiofiles

6.2 添加超时控制

很多 I/O 操作可能因网络波动或服务器无响应而卡死,务必设置超时:

import asyncio

async def maybe_stuck_task():
    await asyncio.sleep(10)   # 假装一个会卡死的任务
    return "Success"

async def main():
    try:
        result = await asyncio.wait_for(maybe_stuck_task(), timeout=5)
        print(result)
    except asyncio.TimeoutError:
        print("⏰ 任务超时!")

asyncio.run(main())

6.3 用 async with 管理异步资源

对于支持异步上下文管理器的对象(如 aiohttp.ClientSession),一定要使用 async with 自动释放资源,防止内存泄漏和连接耗尽。

6.4 正确处理异常

  • asyncio.gather() 默认会将所有任务的异常收集起来一并抛出。如果你希望某个任务出错不影响其他任务,可以设置 return_exceptions=True,此时异常会作为返回值正常返回;
  • 单个 Task 的异常可以通过 try/except await task 来捕获。

7. advanced-features:让异步代码更优雅(选读)

掌握以下特性可以让你在处理复杂异步流程时更得心应手。

7.1 异步迭代器:async for

普通迭代器使用 __iter____next__,异步迭代器需要实现 __aiter__(返回自身)和 __anext__(必须是协程):

import asyncio

class AsyncTimer:
    """异步倒计时器"""
    def __init__(self, start: int):
        self.current = start

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current <= 0:
            raise StopAsyncIteration
        await asyncio.sleep(1)
        self.current -= 1
        return self.current + 1

async def main():
    async for num in AsyncTimer(3):
        print(f"⏲️ 倒计时:{num}")

asyncio.run(main())

7.2 异步生成器:更简洁的写法

异步迭代器写起来有些啰嗦,用 async def 搭配 yield 可以快速创建异步生成器:

import asyncio

async def async_timer_gen(start: int):
    """异步倒计时生成器(更简洁)"""
    for num in range(start, 0, -1):
        await asyncio.sleep(1)
        yield num

async def main():
    async for num in async_timer_gen(3):
        print(f"⏲️ 倒计时:{num}")

asyncio.run(main())

8. 总结

asyncio 是 Python I/O 密集型并发的首选方案,核心要点回顾如下:

  1. 使用 async def 定义协程,await 用于暂停和恢复协程;
  2. 使用 asyncio.run() 自动管理顶层事件循环;
  3. 批量并行用 asyncio.gather(),灵活调度用 asyncio.create_task()
  4. 绝不在协程中使用同步阻塞调用,务必替换为对应的异步库;
  5. 加入超时控制和exception-handling,并利用 async with 管理资源。

掌握以上内容后,你就可以轻松用 asyncio 编写高性能的爬虫、聊天机器人后端、微服务通信组件等。快动手试试吧!