FastAPI异步编程完全指南

📂 所属阶段:第二阶段 — 进阶黑科技(核心篇)
🔗 相关章节:FastAPIdependency-injection · FastAPImiddleware-application

目录

什么是异步编程?

同步 vs 异步:排队打饭的比喻 🍚

想象你去食堂打饭:

  • 同步:你站在窗口等厨师炒完一盘菜,端走,再点下一道。厨师在炒菜时你只能干等
  • 异步:你把菜单交给厨师,然后去拿餐具、倒饮料,等菜好了服务员直接端到你桌上。你不需要傻站着等

这就是核心区别:等待时是否可以切换去干别的事

为什么 Web 服务需要异步?

假设有三个请求同时到达:

请求 A(查询数据库,耗时 200ms)
请求 B(简单计算,耗时 1ms)
请求 C(调用外部 API,耗时 1000ms)
  • 同步模式:按顺序处理,总耗时 = 200 + 1 + 1000 = 1201ms
  • 异步模式:A 等待数据库时切换 B,B 完成切 C,C 等待时切回 A → 总耗时约 1000ms

在 I/O 密集型场景(数据库查询、HTTP 请求、文件读写),异步能让单线程处理大量并发请求,显著提升 API 性能。

异步编程的三大优势

  1. 高并发处理能力:单个进程可以处理数千个并发连接
  2. 资源利用率高:避免线程创建和切换的开销
  3. 响应速度快:I/O 等待期间可以处理其他请求

async/await 详解

基础语法

import asyncio

# 定义一个协程函数(async def)
async def say_hello():
    print("Hello!")
    return "Hello from coroutine!"

# 运行协程的三种方式
# 方式一:asyncio.run()(推荐,主入口用)
asyncio.run(say_hello())

# 方式二:在已有事件循环中创建任务
async def main():
    task = asyncio.create_task(say_hello())  # 调度执行协程
    result = await task                       # 等待任务完成
    print(f"Result: {result}")

asyncio.run(main())

await 在等什么?

await 只能等待可等待对象(Awaitable),包括:

类型示例说明
协程(Coroutine)await coro()async def 返回的对象
任务(Task)asyncio.create_task()调度好的协程
Futureasyncio.Future()尚未完成的结果占位符

asyncio.sleep vs time.sleep

import asyncio
import time

# ❌ 同步 sleep:阻塞整个线程
def sync_task():
    time.sleep(2)  # 2 秒内整条线程被卡住
    print("Sync done")

# ✅ 异步 sleep:让出控制权,处理其他协程
async def async_task():
    await asyncio.sleep(2)  # 2 秒内线程可以干别的
    print("Async done")

# 体验差异
async def compare():
    start = time.time()
    await asyncio.gather(async_task(), async_task())  # 并发执行
    print(f"并发耗时: {time.time() - start:.2f}s")  # 约 2 秒

asyncio.run(compare())

⚠️ 在 FastAPI 中用 time.sleep()阻塞整个事件循环

事件循环原理

什么是事件循环?

事件循环是异步的"调度中心",工作流程:

┌─────────────────────────────────────┐
│          事件循环(单线程)             │
├─────────────────────────────────────┤
│  1. 检查 I/O 事件(网络/文件/定时器)      │
│  2. 收集就绪任务,执行它们               │
│  3. 遇到 await 就挂起,切换到下一个任务     │
│  4. 重复...直到全部完成                  │
└─────────────────────────────────────┘

事件循环生命周期演示

import asyncio

async def task(name, sec):
    print(f"[{name}] 开始")
    await asyncio.sleep(sec)
    print(f"[{name}] 完成")

async def main():
    t1 = asyncio.create_task(task("A", 1))
    t2 = asyncio.create_task(task("B", 0.5))
    t3 = asyncio.create_task(task("C", 0.8))
    await asyncio.gather(t1, t2, t3)
    print("全部完成")

asyncio.run(main())
# 输出顺序说明:
# [B]开始 → [A]开始 → [C]开始
# → [B]完成 → [C]完成 → [A]完成

异步函数调用规则

四条黄金法则

调用方被调函数正确写法
sync 函数async 函数❌ 不能直接调用
sync 函数sync 函数✅ 直接调用
async 函数async 函数✅ await 调用
async 函数sync 函数⚠️ 可以,但耗时同步操作需丢线程池

异步中调用耗时同步函数

import asyncio
import time

def blocking_function():
    time.sleep(1)
    return "Blocking result"

# ✅ 推荐:Python 3.9+ 用 to_thread,更简洁
async def call_sync_simple():
    result = await asyncio.to_thread(blocking_function)
    return result

为什么不能直接 await 同步函数?
因为同步函数不是可等待对象,它会阻塞整个线程。必须用 asyncio.to_thread() 将其扔到单独的线程池执行,这样事件循环才能继续处理其他任务。

FastAPI 中的异步使用

异步路由 vs 同步路由

from fastapi import FastAPI
import asyncio

app = FastAPI()

# ✅ 异步路由:处理 I/O 密集型任务
@app.get("/async-data")
async def get_async_data():
    await asyncio.sleep(1)  # 模拟 I/O 等待
    return {"source": "async", "data": [1, 2, 3]}

# ✅ 同步路由:处理 CPU 密集型任务
@app.get("/sync-data")
def get_sync_data():
    result = sum(range(10**7))  # 纯计算
    return {"source": "sync", "result": result}

FastAPI 会自动检测路由类型

  • 如果定义为 async def,会在事件循环中运行,遇到 await 就挂起
  • 如果定义为 def,会被丢到线程池中执行,不阻塞主循环(但增加少量线程开销)

异步 HTTP 请求(以 httpx 为例)

from fastapi import FastAPI
import httpx
import asyncio

app = FastAPI()

@app.get("/fetch-multiple")
async def fetch_multiple():
    urls = [
        "https://api.github.com/users/octocat",
        "https://api.github.com/users/torvalds",
    ]
    async with httpx.AsyncClient(timeout=10) as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
    return [r.json() for r in responses]

💡 使用 httpx.AsyncClient 作为上下文管理器,可以复用连接,比每次临时创建 AsyncClient 更高效。

何时使用同步代码

同步代码的适用场景

场景原因示例
CPU 密集型任务asyncio 对 CPU-bound 无能为力图像处理、加密、大规模计算
无异步版本的库第三方库只提供同步 APIPillow、NumPy 计算部分
简单同步操作执行极快,无需异步化os.path.exists()、基本数学运算
启动/初始化代码只需执行一次,阻塞无影响app = FastAPI()

经验法则

  • 任务以 I/O 为主(网络、磁盘、数据库) ➔ 用 async def
  • 任务以计算为主(CPU 密集) ➔ 用 def,FastAPI 自动丢线程池
  • 若不确定,可以先写成 def,性能测试后再优化。

常见陷阱与避坑指南

陷阱 1:忘记 await

# ❌ 错误
async def bad():
    data = some_async_function()  # 返回协程对象,不会执行
    return data

# ✅ 正确
async def good():
    data = await some_async_function()
    return data

陷阱 2:循环中串行 await

# ❌ 错误:逐个等待,效率低
async def slow():
    results = []
    for url in urls:
        result = await fetch(url)  # 一个一个获取,总耗时累加
        results.append(result)

# ✅ 正确:并发等待
async def fast():
    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)  # 同时发起请求

陷阱 3:在异步函数中混用同步阻塞调用

import time

async def fragile_route():
    time.sleep(2)  # ❌ 阻塞事件循环,所有请求被卡住
    return {"status": "done"}

修复方法

  • 换成 await asyncio.sleep(2)
  • 或把 time.sleep 放进 await asyncio.to_thread(time.sleep, 2)

陷阱 4:误用全局变量

由于事件循环是单线程,修改变量无需加锁,但异步任务之间的执行顺序不确定,过度依赖共享状态容易出 bug。建议每个请求使用局部状态。

实战:构建异步 API 服务

下面演示一个实际例子:并发获取本地“数据库”数据与外部 GitHub API 数据。

from fastapi import FastAPI, HTTPException
import httpx
import asyncio

app = FastAPI()

# 模拟异步数据库查询
async def query_users():
    await asyncio.sleep(0.1)  # 模拟 I/O 延迟
    return [
        {"id": 1, "name": "Alice"},
        {"id": 2, "name": "Bob"},
    ]

# 异步获取 GitHub 用户信息
async def fetch_github_user(username: str):
    async with httpx.AsyncClient() as client:
        resp = await client.get(f"https://api.github.com/users/{username}")
        if resp.status_code == 404:
            raise HTTPException(status_code=404, detail="User not found")
        resp.raise_for_status()
        return resp.json()

@app.get("/users")
async def list_users():
    """并发获取本地用户列表 + GitHub 公开信息"""
    users, octocat = await asyncio.gather(
        query_users(),
        fetch_github_user("octocat")
    )
    users.append({
        "id": octocat["id"],
        "name": octocat["name"],
        "is_github": True,
        "avatar": octocat["avatar_url"]
    })
    return users

运行效果
两个独立 I/O 操作同时进行,总耗时取决于最慢的那一个,而不是两个操作耗时之和。

性能优化建议

  1. 使用连接池
    httpx.AsyncClientasyncpg.create_pool,避免为每个请求新建连接。
  2. 适当缓存
    同步计算可用 functools.lru_cache,异步缓存可选 asyncio.LifoQueue 或三方库。
  3. 避免不必要的串行 await
    独立的任务统一用 asyncio.gather 并发执行。
  4. 监控事件循环延迟
    生产环境可接入 asyncio 调试模式或使用工具如 prometheus_client 观察阻塞情况。

总结

概念核心要点
async def定义协程函数,返回协程对象
await等待可等待对象,让出控制权
asyncio.sleep异步睡眠,不阻塞事件循环
asyncio.gather并发执行多个协程
asyncio.to_thread耗时同步操作丢线程池
FastAPI 路由I/O 密集用 async,CPU 密集用 def

💡 记住:异步的核心是高效处理大量并发 I/O 请求,但不要在异步函数中做 CPU 密集计算——那会让整个事件循环卡住。合理使用线程池,让同步与异步各司其职,才能最大化 FastAPI 的性能。