异步编程深度解析:async 与 await 的原理,以及何时该用同步代码

📂 所属阶段:第二阶段 — 进阶黑科技(核心篇)
🔗 相关章节:FastAPI 依赖注入系统 · FastAPI 中间件应用


1. 什么是异步编程?

1.1 同步 vs 异步:排队打饭的比喻 🍚

想象你去食堂打饭:

  • 同步:你站在窗口等厨师炒完一盘菜,端走,再点下一道。厨师在炒菜时你只能干等
  • 异步:你把菜单交给厨师,然后去拿餐具、倒饮料,等菜好了服务员直接端到你桌上。你不需要傻站着等

这就是同步与异步的核心区别:等待时是否可以切换去干别的事

1.2 为什么 Web 服务需要异步?

请求 A(查询数据库,耗时 200ms)
请求 B(简单计算,耗时 1ms)
请求 C(调用外部 API,耗时 1000ms)
  • 同步模式:按顺序处理,总耗时 = 200 + 1 + 1000 = 1201ms
  • 异步模式:A 等待数据库时切换 B,B 完成切 C,C 等待时切回 A → 总耗时 ≈ 1000ms

在 I/O 密集型场景(数据库查询、HTTP 请求、文件读写),异步能让单线程处理大量并发请求


2. async / await 详解

2.1 基础语法

import asyncio

# 定义一个协程函数(async def)
async def say_hello():
    print("Hello!")

# 运行协程的三种方式
# 方式一:asyncio.run()(推荐,主入口用)
asyncio.run(say_hello())

# 方式二:在已有事件循环中创建任务
async def main():
    task = asyncio.create_task(say_hello())  # 创建任务
    await task                                  # 等待任务完成

asyncio.run(main())

# 方式三:直接 await(只能在 async 函数内)
async def main():
    await say_hello()

2.2 await 在等什么?

await 关键字只能等待可等待对象(Awaitable),包括:

类型示例说明
协程(Coroutine)await coro()async def 返回的对象
任务(Task)asyncio.create_task()调度好的协程
Futureasyncio.Future()尚未完成的结果占位符
async def fetch_data():
    return {"data": "hello"}

async def main():
    # await 一个协程
    result = await fetch_data()
    print(result)

asyncio.run(main())

2.3 asyncio.sleep vs time.sleep

import asyncio
import time

# ❌ 同步 sleep:阻塞整个线程,无法处理其他任务
def sync_task():
    time.sleep(2)  # 在这 2 秒内,整条线程都被卡住
    print("done")

# ✅ 异步 sleep:让出控制权,线程可以处理其他协程
async def async_task():
    await asyncio.sleep(2)  # 这 2 秒内,线程可以去干别的事
    print("done")

# 体验差异
async def compare():
    start = time.time()
    # 顺序执行两个 async_task
    await async_task()
    await async_task()
    print(f"顺序执行耗时: {time.time() - start:.2f}s")  # ~4秒

    start = time.time()
    # 并发执行
    await asyncio.gather(async_task(), async_task())
    print(f"并发执行耗时: {time.time() - start:.2f}s")  # ~2秒

asyncio.run(compare())

⚠️ asyncio.sleep() 让出 GIL,但 time.sleep() 是真正阻塞线程。在 FastAPI 中使用 time.sleep()阻塞整个事件循环


3. 事件循环(Event Loop)原理

3.1 什么是事件循环?

事件循环是异步的"调度中心",它的工作流程:

┌─────────────────────────────────────┐
│          事件循环(单线程)             │
├─────────────────────────────────────┤
│  1. 检查 IO 事件(网络/文件/定时器)      │
│  2. 收集就绪的任务,执行它们             │
│  3. 遇到 await 就挂起,切换到下一个任务   │
│  4. 重复...直到全部完成                  │
└─────────────────────────────────────┘

Python 异步模型基于 Generator,协程本质上是可暂停、恢复的生成器函数

3.2 事件循环生命周期

import asyncio

async def task(name, sec):
    print(f"[{name}] 开始")
    await asyncio.sleep(sec)
    print(f"[{name}] 完成")

async def main():
    # 创建任务(还未执行)
    t1 = asyncio.create_task(task("A", 1))
    t2 = asyncio.create_task(task("B", 0.5))
    t3 = asyncio.create_task(task("C", 0.8))

    # 等待所有任务完成
    await asyncio.gather(t1, t2, t3)
    print("全部完成")

asyncio.run(main())
# 输出顺序:
# [B] 开始
# [A] 开始
# [C] 开始
# [B] 完成
# [C] 完成
# [A] 完成
# 全部完成

4. 异步函数的调用规则

4.1 四条黄金法则

┌──────────────────────────────────────────────────────┐
│                  异步调用规则速查表                     │
├──────────────┬───────────────────┬────────────────────┤
│ 调用方        │ 被调函数           │ 正确写法           │
├──────────────┼───────────────────┼────────────────────┤
│ sync 函数     │ async 函数         │ ❌ 不能直接调用      │
│ sync 函数     │ sync 函数          │ ✅ 直接调用        │
│ async 函数    │ async 函数         │ ✅ await 调用     │
│ async 函数    │ sync 函数          │ ⚠️ 可以,但不推荐  │
└──────────────┴───────────────────┴────────────────────┘

4.2 同步函数中调用异步函数

# ❌ 错误:同步函数中直接调用协程
def bad_example():
    result = fetch_data()  # 返回一个协程对象,不会执行!
    print(result)          # 打印的是 <coroutine object...>

# ✅ 正确方式一:包装为新事件循环(不推荐,代价大)
def call_async():
    result = asyncio.run(fetch_data())
    print(result)

# ✅ 正确方式二:如果在异步上下文中,同步调用需要用 run_in_executor
async def main():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, sync_function)
    print(result)

4.3 异步函数中调用同步函数

# ✅ 可以,但不推荐。如果同步函数耗时,会阻塞事件循环
async def call_sync():
    result = blocking_function()  # 如果这个执行很久,整个事件循环会卡住
    return result

# ✅ 推荐:耗时同步操作丢到线程池
async def call_sync_properly():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, blocking_function)
    return result

# 或者用 to_thread(Python 3.9+,更简洁)
async def call_sync_simple():
    result = await asyncio.to_thread(blocking_function)
    return result

5. 在 FastAPI 中使用 async

5.1 FastAPI 的异步路由

from fastapi import FastAPI
import asyncio

app = FastAPI()

# ✅ 异步路由:处理 I/O 密集型任务
@app.get("/async-data")
async def get_async_data():
    # 模拟异步 I/O(如数据库查询、HTTP 请求)
    await asyncio.sleep(1)
    return {"source": "async", "data": [1, 2, 3]}

# ✅ 同步路由:处理 CPU 密集型任务
@app.get("/sync-data")
def get_sync_data():
    # 计算密集型任务,不需要 await
    result = sum(range(10**7))
    return {"source": "sync", "result": result}

5.2 异步数据库操作(以 asyncpg 为例)

from fastapi import FastAPI
import asyncpg
import asyncio

app = FastAPI()
pool = None

@app.on_event("startup")
async def startup():
    global pool
    pool = await asyncpg.create_pool(
        host="localhost",
        database="mydb",
        user="admin",
        password="secret",
        min_size=5,
        max_size=20
    )

@app.on_event("shutdown")
async def shutdown():
    await pool.close()

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    async with pool.acquire() as conn:
        # 异步查询,不会阻塞事件循环
        user = await conn.fetchrow(
            "SELECT * FROM users WHERE id = $1", user_id
        )
    return dict(user)

5.3 异步 HTTP 请求(以 httpx 为例)

from fastapi import FastAPI
import httpx

app = FastAPI()

@app.get("/fetch-multiple")
async def fetch_multiple():
    urls = [
        "https://api.github.com/users/octocat",
        "https://api.github.com/users/torvalds",
        "https://api.github.com/users/gvanrossum",
    ]

    # 异步并发请求所有 URL
    async with httpx.AsyncClient(timeout=10) as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)

    return [r.json() for r in responses]

6. 何时用同步代码?

6.1 用同步的场景

场景原因
CPU 密集型任务(计算、加密、压缩)asyncio 对 CPU-bound 无能为力,用 run_in_executor 丢到线程池
没有异步版本的库Pillownumpy(计算部分)
简单的同步操作文件读写(os.path、open)、数学计算
启动/初始化代码app = FastAPI() 这种只需跑一次

6.2 CPU 密集型任务的正确处理方式

from fastapi import FastAPI, BackgroundTasks
import asyncio

app = FastAPI()

# ❌ 错误:直接同步处理 CPU 密集型任务
@app.get("/heavy")
def heavy_task():
    total = 0
    for i in range(10**8):  # 模拟 CPU 密集计算
        total += i
    return {"result": total}

# ✅ 正确:用 run_in_executor 放到线程池
@app.get("/heavy")
async def heavy_task():
    def compute():
        total = 0
        for i in range(10**8):
            total += i
        return total

    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, compute)
    return {"result": result}

# ✅ 推荐:用 BackgroundTasks 异步执行
@app.get("/heavy", response_model=dict)
async def heavy_task(background_tasks: BackgroundTasks):
    def compute():
        total = 0
        for i in range(10**8):
            total += i
        return total

    background_tasks.add_task(compute)
    return {"message": "任务已在后台执行"}

6.3 同步库的异步包装

import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

def blocking_library_func(data):
    # 某个同步库
    import json
    return json.dumps(data)

async def async_wrapper(data):
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        executor, blocking_library_func, data
    )
    return result

7. 常见陷阱与避坑指南

陷阱 1:忘记 await

# ❌ 错误
async def bad():
    data = some_async_function()  # 这返回的是协程对象,不会执行!
    return data

# ✅ 正确
async def good():
    data = await some_async_function()
    return data

陷阱 2:在循环中串行 await

# ❌ 错误:逐个等待,效率低
async def slow():
    for url in urls:
        result = await fetch(url)
        results.append(result)

# ✅ 正确:并发等待
async def fast():
    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)

陷阱 3:混淆 async 函数与普通函数

# ❌ 错误
@app.get("/")
def endpoint():
    data = await async_db_query()  # 普通函数不能 await
    return data

# ✅ 正确
@app.get("/")
async def endpoint():
    data = await async_db_query()
    return data

陷阱 4:用 time.sleep 而不是 asyncio.sleep

# ❌ 错误
async def bad_sleep():
    time.sleep(5)  # 阻塞整个事件循环 5 秒

# ✅ 正确
async def good_sleep():
    await asyncio.sleep(5)  # 让出线程,可处理其他请求

8. 实战:构建一个异步 API 服务

"""
完整示例:异步调用外部 API + 异步数据库查询
"""
from fastapi import FastAPI, HTTPException
import httpx
import asyncio
from typing import List

app = FastAPI()

# 模拟异步数据库查询
async def query_users() -> List[dict]:
    await asyncio.sleep(0.1)  # 模拟 DB 查询延迟
    return [
        {"id": 1, "name": "Alice"},
        {"id": 2, "name": "Bob"},
    ]

# 异步获取 GitHub 用户信息
async def fetch_github_user(username: str) -> dict:
    async with httpx.AsyncClient() as client:
        resp = await client.get(f"https://api.github.com/users/{username}")
        if resp.status_code == 404:
            raise HTTPException(status_code=404, detail="User not found")
        resp.raise_for_status()
        return resp.json()

@app.get("/users")
async def list_users():
    """并发获取用户列表和 GitHub 公开信息"""
    users = await query_users()

    # 并发获取每个用户的 GitHub 头像
    tasks = [
        fetch_github_user(u["name"].lower())
        for u in users
    ]
    github_data = await asyncio.gather(*tasks, return_exceptions=True)

    # 合并结果
    result = []
    for user, gh in zip(users, github_data):
        if isinstance(gh, Exception):
            user["avatar"] = None
        else:
            user["avatar"] = gh.get("avatar_url")
        result.append(user)

    return result

@app.get("/github/{username}")
async def get_github(username: str):
    """获取单个 GitHub 用户"""
    return await fetch_github_user(username)

9. 小结

概念核心要点
async def定义协程函数,返回协程对象
await等待可等待对象(协程/任务/Future),让出控制权
asyncio.sleep异步睡眠,不阻塞事件循环
asyncio.gather并发执行多个协程
run_in_executor将同步代码丢到线程池执行
FastAPI 路由I/O 密集用 async,CPU 密集用 sync + run_in_executor

💡 记住:在 FastAPI 中,异步路由可以并发处理大量 I/O 请求,性能远优于同步路由。但不要在异步函数中做 CPU 密集计算——那会让整个事件循环卡住。


🔗 扩展阅读