FastAPI异步编程完全指南

📂 所属阶段:第二阶段 — 进阶黑科技(核心篇)
🔗 相关章节:FastAPI依赖注入系统 · FastAPI中间件应用

目录

什么是异步编程?

同步vs异步:排队打饭的比喻 🍚

想象你去食堂打饭:

  • 同步:你站在窗口等厨师炒完一盘菜,端走,再点下一道。厨师在炒菜时你只能干等
  • 异步:你把菜单交给厨师,然后去拿餐具、倒饮料,等菜好了服务员直接端到你桌上。你不需要傻站着等

这就是同步与异步的核心区别:等待时是否可以切换去干别的事

为什么Web服务需要异步?

请求 A(查询数据库,耗时 200ms)
请求 B(简单计算,耗时 1ms)
请求 C(调用外部 API,耗时 1000ms)
  • 同步模式:按顺序处理,总耗时 = 200 + 1 + 1000 = 1201ms
  • 异步模式:A 等待数据库时切换 B,B 完成切 C,C 等待时切回 A → 总耗时 ≈ 1000ms

在 I/O 密集型场景(数据库查询、HTTP 请求、文件读写),异步能让单线程处理大量并发请求,显著提升API性能。

异步编程的优势

  1. 高并发处理能力:单个进程可以处理数千个并发连接
  2. 资源利用率高:避免线程创建和切换的开销
  3. 响应速度快:I/O等待期间可以处理其他请求
  4. 内存占用低:协程比线程占用更少内存

async/await详解

基础语法

import asyncio

# 定义一个协程函数(async def)
async def say_hello():
    print("Hello!")
    return "Hello from coroutine!"

# 运行协程的三种方式
# 方式一:asyncio.run()(推荐,主入口用)
asyncio.run(say_hello())

# 方式二:在已有事件循环中创建任务
async def main():
    task = asyncio.create_task(say_hello())  # 创建任务
    result = await task                       # 等待任务完成
    print(f"Result: {result}")

asyncio.run(main())

# 方式三:直接 await(只能在 async 函数内)
async def main():
    result = await say_hello()
    print(f"Result: {result}")

await 在等什么?

await 关键字只能等待可等待对象(Awaitable),包括:

类型示例说明
协程(Coroutine)await coro()async def 返回的对象
任务(Task)asyncio.create_task()调度好的协程
Futureasyncio.Future()尚未完成的结果占位符
async def fetch_data():
    await asyncio.sleep(0.1)  # 模拟异步I/O
    return {"data": "hello", "timestamp": "2024-01-15"}

async def main():
    # await 一个协程
    result = await fetch_data()
    print(result)

asyncio.run(main())

asyncio.sleep vs time.sleep

import asyncio
import time

# ❌ 同步 sleep:阻塞整个线程,无法处理其他任务
def sync_task():
    time.sleep(2)  # 在这 2 秒内,整条线程都被卡住
    print("Sync task done")

# ✅ 异步 sleep:让出控制权,线程可以处理其他协程
async def async_task():
    await asyncio.sleep(2)  # 这 2 秒内,线程可以去干别的事
    print("Async task done")

# 体验差异
async def compare():
    start = time.time()
    # 顺序执行两个 async_task
    await async_task()
    await async_task()
    print(f"顺序执行耗时: {time.time() - start:.2f}s")  # ~4秒

    start = time.time()
    # 并发执行
    await asyncio.gather(async_task(), async_task())
    print(f"并发执行耗时: {time.time() - start:.2f}s")  # ~2秒

asyncio.run(compare())

⚠️ asyncio.sleep() 让出 GIL,但 time.sleep() 是真正阻塞线程。在 FastAPI 中使用 time.sleep()阻塞整个事件循环

协程与任务的区别

import asyncio

async def my_coroutine():
    print("Coroutine started")
    await asyncio.sleep(1)
    print("Coroutine finished")
    return "Result"

async def demonstrate_difference():
    # 协程对象 - 只是定义,不会执行
    coro = my_coroutine()
    print(f"Type: {type(coro)}")  # <class 'coroutine'>
    
    # 任务对象 - 调度执行的协程
    task = asyncio.create_task(my_coroutine())
    print(f"Type: {type(task)}")  # <class 'Task'>
    
    # 等待任务完成
    result = await task
    print(f"Result: {result}")

asyncio.run(demonstrate_difference())

事件循环原理

什么是事件循环?

事件循环是异步的"调度中心",它的工作流程:

┌─────────────────────────────────────┐
│          事件循环(单线程)             │
├─────────────────────────────────────┤
│  1. 检查 IO 事件(网络/文件/定时器)      │
│  2. 收集就绪的任务,执行它们             │
│  3. 遇到 await 就挂起,切换到下一个任务   │
│  4. 重复...直到全部完成                  │
└─────────────────────────────────────┘

Python 异步模型基于 Generator,协程本质上是可暂停、恢复的生成器函数

事件循环生命周期

import asyncio

async def task(name, sec):
    print(f"[{name}] 开始")
    await asyncio.sleep(sec)
    print(f"[{name}] 完成")

async def main():
    # 创建任务(还未执行)
    t1 = asyncio.create_task(task("A", 1))
    t2 = asyncio.create_task(task("B", 0.5))
    t3 = asyncio.create_task(task("C", 0.8))

    # 等待所有任务完成
    await asyncio.gather(t1, t2, t3)
    print("全部完成")

asyncio.run(main())
# 输出顺序:
# [B] 开始
# [A] 开始
# [C] 开始
# [B] 完成
# [C] 完成
# [A] 完成
# 全部完成

事件循环的高级用法

import asyncio

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    
    # 设置循环关闭时的回调
    def on_close():
        print("Loop is closing...")
    
    loop.call_soon(on_close)
    
    # 创建任务并监控状态
    task = asyncio.create_task(asyncio.sleep(1))
    
    # 检查任务是否完成
    print(f"Task done: {task.done()}")
    
    await task
    print(f"Task done: {task.done()}")
    print(f"Task result: {task.result()}")

asyncio.run(main())

异步函数调用规则

四条黄金法则

┌──────────────────────────────────────────────────────┐
│                  异步调用规则速查表                     │
├──────────────┬───────────────────┬────────────────────┤
│ 调用方        │ 被调函数           │ 正确写法           │
├──────────────┼───────────────────┼────────────────────┤
│ sync 函数     │ async 函数         │ ❌ 不能直接调用      │
│ sync 函数     │ sync 函数          │ ✅ 直接调用        │
│ async 函数    │ async 函数         │ ✅ await 调用     │
│ async 函数    │ sync 函数          │ ⚠️ 可以,但不推荐  │
└──────────────┴───────────────────┴────────────────────┘

同步函数中调用异步函数

# ❌ 错误:同步函数中直接调用协程
def bad_example():
    result = fetch_data()  # 返回一个协程对象,不会执行!
    print(result)          # 打印的是 <coroutine object...>

# ✅ 正确方式一:包装为新事件循环(不推荐,代价大)
def call_async():
    async def inner():
        return await fetch_data()
    result = asyncio.run(inner())
    print(result)
    return result

# ✅ 正确方式二:如果在异步上下文中,同步调用需要用 run_in_executor
async def main():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, sync_function)
    print(result)

异步函数中调用同步函数

import time

# 同步函数示例
def blocking_function():
    time.sleep(1)  # 模拟阻塞操作
    return "Blocking result"

# ✅ 可以,但不推荐。如果同步函数耗时,会阻塞事件循环
async def call_sync():
    result = blocking_function()  # 如果这个执行很久,整个事件循环会卡住
    return result

# ✅ 推荐:耗时同步操作丢到线程池
async def call_sync_properly():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, blocking_function)
    return result

# 或者用 to_thread(Python 3.9+,更简洁)
async def call_sync_simple():
    result = await asyncio.to_thread(blocking_function)
    return result

FastAPI中的异步使用

FastAPI的异步路由

from fastapi import FastAPI
import asyncio

app = FastAPI()

# ✅ 异步路由:处理 I/O 密集型任务
@app.get("/async-data")
async def get_async_data():
    # 模拟异步 I/O(如数据库查询、HTTP 请求)
    await asyncio.sleep(1)
    return {"source": "async", "data": [1, 2, 3], "timestamp": "2024-01-15"}

# ✅ 同步路由:处理 CPU 密集型任务
@app.get("/sync-data")
def get_sync_data():
    # 计算密集型任务,不需要 await
    result = sum(range(10**7))
    return {"source": "sync", "result": result}

异步数据库操作(以 asyncpg 为例)

from fastapi import FastAPI
import asyncpg
import asyncio

app = FastAPI()
pool = None

@app.on_event("startup")
async def startup():
    global pool
    pool = await asyncpg.create_pool(
        host="localhost",
        database="mydb",
        user="admin",
        password="secret",
        min_size=5,
        max_size=20
    )

@app.on_event("shutdown")
async def shutdown():
    await pool.close()

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    async with pool.acquire() as conn:
        # 异步查询,不会阻塞事件循环
        user = await conn.fetchrow(
            "SELECT * FROM users WHERE id = $1", user_id
        )
    return dict(user)

异步HTTP请求(以httpx为例)

from fastapi import FastAPI
import httpx

app = FastAPI()

@app.get("/fetch-multiple")
async def fetch_multiple():
    urls = [
        "https://api.github.com/users/octocat",
        "https://api.github.com/users/torvalds",
        "https://api.github.com/users/gvanrossum",
    ]

    # 异步并发请求所有 URL
    async with httpx.AsyncClient(timeout=10) as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)

    return [r.json() for r in responses]

FastAPI依赖注入中的异步

from fastapi import Depends

async def get_current_user():
    # 模拟异步用户认证
    await asyncio.sleep(0.1)
    return {"username": "john_doe", "id": 123}

@app.get("/protected")
async def protected_route(current_user: dict = Depends(get_current_user)):
    return current_user

# 异步依赖的缓存
from functools import lru_cache

@lru_cache(maxsize=128)
def cached_sync_operation(param: str):
    # 缓存同步操作
    return f"Result for {param}"

async def async_cached_dependency(param: str):
    # 在异步函数中使用缓存的同步操作
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, cached_sync_operation, param)

何时使用同步代码

用同步的场景

场景原因
CPU 密集型任务(计算、加密、压缩)asyncio 对 CPU-bound 无能为力,用 run_in_executor 丢到线程池
没有异步版本的库Pillownumpy(计算部分)
简单的同步操作文件读写(os.path、open)、数学计算
启动/初始化代码app = FastAPI() 这种只需跑一次

CPU密集型任务的正确处理方式

from fastapi import FastAPI, BackgroundTasks
import asyncio

app = FastAPI()

# ❌ 错误:直接同步处理 CPU 密集型任务
@app.get("/heavy")
def heavy_task():
    total = 0
    for i in range(10**8):  # 模拟 CPU 密集计算
        total += i
    return {"result": total}

# ✅ 正确:用 run_in_executor 放到线程池
@app.get("/heavy")
async def heavy_task():
    def compute():
        total = 0
        for i in range(10**8):
            total += i
        return total

    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, compute)
    return {"result": result}

# ✅ 推荐:用 BackgroundTasks 异步执行
@app.get("/heavy", response_model=dict)
async def heavy_task(background_tasks: BackgroundTasks):
    def compute():
        total = 0
        for i in range(10**8):
            total += i
        return total

    background_tasks.add_task(compute)
    return {"message": "任务已在后台执行"}

同步库的异步包装

import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

def blocking_library_func(data):
    # 某个同步库
    import json
    return json.dumps(data, ensure_ascii=False)

async def async_wrapper(data):
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        executor, blocking_library_func, data
    )
    return result

# 使用示例
async def use_blocking_lib():
    data = {"name": "张三", "age": 25}
    result = await async_wrapper(data)
    print(result)
    return result

常见陷阱与避坑指南

陷阱1:忘记await

# ❌ 错误
async def bad():
    data = some_async_function()  # 这返回的是协程对象,不会执行!
    return data

# ✅ 正确
async def good():
    data = await some_async_function()
    return data

陷阱2:在循环中串行await

import asyncio

async def fetch(url):
    await asyncio.sleep(0.1)  # 模拟请求
    return f"Data from {url}"

urls = ["url1", "url2", "url3"]

# ❌ 错误:逐个等待,效率低
async def slow():
    results = []
    for url in urls:
        result = await fetch(url)  # 串行等待
        results.append(result)
    return results

# ✅ 正确:并发等待
async def fast():
    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)  # 并发执行
    return results

# ✅ 更灵活的处理方式
async def flexible():
    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    # 处理可能的异常
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Error fetching {urls[i]}: {result}")
        else:
            print(f"Success: {result}")
    return results

陷阱3:混淆async函数与普通函数

from fastapi import FastAPI

app = FastAPI()

# ❌ 错误
@app.get("/bad")
def endpoint():
    data = await async_db_query()  # 普通函数不能 await
    return data

# ✅ 正确
@app.get("/good")
async def endpoint():
    data = await async_db_query()
    return data

陷阱4:用time.sleep而不是asyncio.sleep

import time

# ❌ 错误
async def bad_sleep():
    time.sleep(5)  # 阻塞整个事件循环 5 秒

# ✅ 正确
async def good_sleep():
    await asyncio.sleep(5)  # 让出线程,可处理其他请求

陷阱5:在FastAPI中阻塞事件循环

# ❌ 错误:在FastAPI异步路由中使用阻塞操作
@app.get("/blocking-operation")
async def blocking_endpoint():
    time.sleep(10)  # 这会阻塞整个服务器10秒!
    return {"message": "This took 10 seconds and blocked everything!"}

# ✅ 正确:将阻塞操作放到线程池
@app.get("/non-blocking-operation")
async def non_blocking_endpoint():
    def blocking_op():
        time.sleep(10)  # 在线程池中执行
        return "Done"
    
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, blocking_op)
    return {"message": result}

实战:构建异步API服务

"""
完整示例:异步调用外部 API + 异步数据库查询
"""
from fastapi import FastAPI, HTTPException
import httpx
import asyncio
from typing import List, Dict, Any
import time

app = FastAPI()

# 模拟异步数据库查询
async def query_users() -> List[Dict[str, Any]]:
    await asyncio.sleep(0.1)  # 模拟 DB 查询延迟
    return [
        {"id": 1, "name": "Alice", "email": "alice@example.com"},
        {"id": 2, "name": "Bob", "email": "bob@example.com"},
    ]

# 异步获取 GitHub 用户信息
async def fetch_github_user(username: str) -> Dict[str, Any]:
    async with httpx.AsyncClient() as client:
        resp = await client.get(f"https://api.github.com/users/{username}")
        if resp.status_code == 404:
            raise HTTPException(status_code=404, detail="User not found")
        resp.raise_for_status()
        return resp.json()

@app.get("/users")
async def list_users():
    """并发获取用户列表和 GitHub 公开信息"""
    start_time = time.time()
    
    # 并发获取用户数据和GitHub信息
    users_task = query_users()
    github_tasks = [
        fetch_github_user(u["name"].lower())
        for u in [{"name": "octocat"}, {"name": "torvalds"}]  # 示例用户
    ]
    
    # 等待所有任务完成
    users, *github_data = await asyncio.gather(users_task, *github_tasks)
    
    # 合并结果
    result = []
    for user in users:
        result.append(user)
    
    # 添加GitHub用户信息
    for gh_user in github_data:
        result.append({
            "id": gh_user["id"],
            "name": gh_user["name"],
            "github": True,
            "avatar": gh_user.get("avatar_url")
        })

    processing_time = time.time() - start_time
    return {
        "users": result,
        "processing_time": f"{processing_time:.2f}s",
        "total_users": len(result)
    }

@app.get("/github/{username}")
async def get_github(username: str):
    """获取单个 GitHub 用户"""
    return await fetch_github_user(username)

# 高级异步模式:流式响应
@app.get("/stream-data")
async def stream_data():
    """演示流式响应"""
    async def generate_data():
        for i in range(10):
            await asyncio.sleep(0.5)  # 模拟数据生成延迟
            yield f"data: {i}\n\n"
    
    return StreamingResponse(generate_data(), media_type="text/event-stream")

from fastapi.responses import StreamingResponse

# 异步WebSocket处理
@app.websocket("/ws")
async def websocket_endpoint(websocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            # 异步处理数据
            processed_data = await process_websocket_data(data)
            await websocket.send_text(f"Processed: {processed_data}")
    except Exception as e:
        print(f"WebSocket error: {e}")
    finally:
        await websocket.close()

async def process_websocket_data(data: str) -> str:
    """异步处理WebSocket数据"""
    await asyncio.sleep(0.1)  # 模拟处理时间
    return data.upper()

性能优化建议

1. 使用连接池

# 数据库连接池
import asyncpg
from contextlib import asynccontextmanager

@asynccontextmanager
async def get_db_connection():
    conn = await asyncpg.connect(
        host="localhost",
        database="mydb",
        user="user",
        password="password"
    )
    try:
        yield conn
    finally:
        await conn.close()

2. 适当使用缓存

import asyncio
from functools import lru_cache

@lru_cache(maxsize=128)
def expensive_calculation(n: int) -> int:
    """昂贵的计算,使用缓存避免重复计算"""
    result = 0
    for i in range(n):
        result += i * i
    return result

async def async_calculation(n: int):
    """在异步环境中使用缓存的同步计算"""
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, expensive_calculation, n)

3. 避免不必要的同步

# ❌ 不必要的同步
@app.get("/bad-sync")
async def bad_sync():
    # 这些操作其实可以并发
    data1 = await fetch_data1()
    data2 = await fetch_data2()
    data3 = await fetch_data3()
    return {"data1": data1, "data2": data2, "data3": data3}

# ✅ 并发处理
@app.get("/good-async")
async def good_async():
    # 并发获取所有数据
    data1, data2, data3 = await asyncio.gather(
        fetch_data1(),
        fetch_data2(), 
        fetch_data3()
    )
    return {"data1": data1, "data2": data2, "data3": data3}

相关教程

在FastAPI中,对于I/O密集型操作(如数据库查询、HTTP请求、文件读写)使用async/await可以获得显著的性能提升。但对于CPU密集型操作,应该使用同步函数配合线程池来处理,避免阻塞事件循环。 异步编程的核心优势在于处理大量并发I/O操作,可以显著提高API的吞吐量。但要注意避免在异步函数中执行阻塞操作,这会降低整体性能。

总结

概念核心要点
async def定义协程函数,返回协程对象
await等待可等待对象(协程/任务/Future),让出控制权
asyncio.sleep异步睡眠,不阻塞事件循环
asyncio.gather并发执行多个协程
run_in_executor将同步代码丢到线程池执行
FastAPI路由I/O密集用async,CPU密集用sync + run_in_executor

💡 记住:在 FastAPI 中,异步路由可以并发处理大量 I/O 请求,性能远优于同步路由。但不要在异步函数中做 CPU 密集计算——那会让整个事件循环卡住。

FastAPI的异步特性使其成为构建高性能Web API的理想选择。正确理解和运用async/await机制,可以让您的API在高并发场景下表现出色。


1. 什么是异步编程?

1.1 同步 vs 异步:排队打饭的比喻 🍚

想象你去食堂打饭:

  • 同步:你站在窗口等厨师炒完一盘菜,端走,再点下一道。厨师在炒菜时你只能干等
  • 异步:你把菜单交给厨师,然后去拿餐具、倒饮料,等菜好了服务员直接端到你桌上。你不需要傻站着等

这就是同步与异步的核心区别:等待时是否可以切换去干别的事

1.2 为什么 Web 服务需要异步?

请求 A(查询数据库,耗时 200ms)
请求 B(简单计算,耗时 1ms)
请求 C(调用外部 API,耗时 1000ms)
  • 同步模式:按顺序处理,总耗时 = 200 + 1 + 1000 = 1201ms
  • 异步模式:A 等待数据库时切换 B,B 完成切 C,C 等待时切回 A → 总耗时 ≈ 1000ms

在 I/O 密集型场景(数据库查询、HTTP 请求、文件读写),异步能让单线程处理大量并发请求


2. async / await 详解

2.1 基础语法

import asyncio

# 定义一个协程函数(async def)
async def say_hello():
    print("Hello!")

# 运行协程的三种方式
# 方式一:asyncio.run()(推荐,主入口用)
asyncio.run(say_hello())

# 方式二:在已有事件循环中创建任务
async def main():
    task = asyncio.create_task(say_hello())  # 创建任务
    await task                                  # 等待任务完成

asyncio.run(main())

# 方式三:直接 await(只能在 async 函数内)
async def main():
    await say_hello()

2.2 await 在等什么?

await 关键字只能等待可等待对象(Awaitable),包括:

类型示例说明
协程(Coroutine)await coro()async def 返回的对象
任务(Task)asyncio.create_task()调度好的协程
Futureasyncio.Future()尚未完成的结果占位符
async def fetch_data():
    return {"data": "hello"}

async def main():
    # await 一个协程
    result = await fetch_data()
    print(result)

asyncio.run(main())

2.3 asyncio.sleep vs time.sleep

import asyncio
import time

# ❌ 同步 sleep:阻塞整个线程,无法处理其他任务
def sync_task():
    time.sleep(2)  # 在这 2 秒内,整条线程都被卡住
    print("done")

# ✅ 异步 sleep:让出控制权,线程可以处理其他协程
async def async_task():
    await asyncio.sleep(2)  # 这 2 秒内,线程可以去干别的事
    print("done")

# 体验差异
async def compare():
    start = time.time()
    # 顺序执行两个 async_task
    await async_task()
    await async_task()
    print(f"顺序执行耗时: {time.time() - start:.2f}s")  # ~4秒

    start = time.time()
    # 并发执行
    await asyncio.gather(async_task(), async_task())
    print(f"并发执行耗时: {time.time() - start:.2f}s")  # ~2秒

asyncio.run(compare())

⚠️ asyncio.sleep() 让出 GIL,但 time.sleep() 是真正阻塞线程。在 FastAPI 中使用 time.sleep()阻塞整个事件循环


3. 事件循环(Event Loop)原理

3.1 什么是事件循环?

事件循环是异步的"调度中心",它的工作流程:

┌─────────────────────────────────────┐
│          事件循环(单线程)             │
├─────────────────────────────────────┤
│  1. 检查 IO 事件(网络/文件/定时器)      │
│  2. 收集就绪的任务,执行它们             │
│  3. 遇到 await 就挂起,切换到下一个任务   │
│  4. 重复...直到全部完成                  │
└─────────────────────────────────────┘

Python 异步模型基于 Generator,协程本质上是可暂停、恢复的生成器函数

3.2 事件循环生命周期

import asyncio

async def task(name, sec):
    print(f"[{name}] 开始")
    await asyncio.sleep(sec)
    print(f"[{name}] 完成")

async def main():
    # 创建任务(还未执行)
    t1 = asyncio.create_task(task("A", 1))
    t2 = asyncio.create_task(task("B", 0.5))
    t3 = asyncio.create_task(task("C", 0.8))

    # 等待所有任务完成
    await asyncio.gather(t1, t2, t3)
    print("全部完成")

asyncio.run(main())
# 输出顺序:
# [B] 开始
# [A] 开始
# [C] 开始
# [B] 完成
# [C] 完成
# [A] 完成
# 全部完成

4. 异步函数的调用规则

4.1 四条黄金法则

┌──────────────────────────────────────────────────────┐
│                  异步调用规则速查表                     │
├──────────────┬───────────────────┬────────────────────┤
│ 调用方        │ 被调函数           │ 正确写法           │
├──────────────┼───────────────────┼────────────────────┤
│ sync 函数     │ async 函数         │ ❌ 不能直接调用      │
│ sync 函数     │ sync 函数          │ ✅ 直接调用        │
│ async 函数    │ async 函数         │ ✅ await 调用     │
│ async 函数    │ sync 函数          │ ⚠️ 可以,但不推荐  │
└──────────────┴───────────────────┴────────────────────┘

4.2 同步函数中调用异步函数

# ❌ 错误:同步函数中直接调用协程
def bad_example():
    result = fetch_data()  # 返回一个协程对象,不会执行!
    print(result)          # 打印的是 <coroutine object...>

# ✅ 正确方式一:包装为新事件循环(不推荐,代价大)
def call_async():
    result = asyncio.run(fetch_data())
    print(result)

# ✅ 正确方式二:如果在异步上下文中,同步调用需要用 run_in_executor
async def main():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, sync_function)
    print(result)

4.3 异步函数中调用同步函数

# ✅ 可以,但不推荐。如果同步函数耗时,会阻塞事件循环
async def call_sync():
    result = blocking_function()  # 如果这个执行很久,整个事件循环会卡住
    return result

# ✅ 推荐:耗时同步操作丢到线程池
async def call_sync_properly():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, blocking_function)
    return result

# 或者用 to_thread(Python 3.9+,更简洁)
async def call_sync_simple():
    result = await asyncio.to_thread(blocking_function)
    return result

5. 在 FastAPI 中使用 async

5.1 FastAPI 的异步路由

from fastapi import FastAPI
import asyncio

app = FastAPI()

# ✅ 异步路由:处理 I/O 密集型任务
@app.get("/async-data")
async def get_async_data():
    # 模拟异步 I/O(如数据库查询、HTTP 请求)
    await asyncio.sleep(1)
    return {"source": "async", "data": [1, 2, 3]}

# ✅ 同步路由:处理 CPU 密集型任务
@app.get("/sync-data")
def get_sync_data():
    # 计算密集型任务,不需要 await
    result = sum(range(10**7))
    return {"source": "sync", "result": result}

5.2 异步数据库操作(以 asyncpg 为例)

from fastapi import FastAPI
import asyncpg
import asyncio

app = FastAPI()
pool = None

@app.on_event("startup")
async def startup():
    global pool
    pool = await asyncpg.create_pool(
        host="localhost",
        database="mydb",
        user="admin",
        password="secret",
        min_size=5,
        max_size=20
    )

@app.on_event("shutdown")
async def shutdown():
    await pool.close()

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    async with pool.acquire() as conn:
        # 异步查询,不会阻塞事件循环
        user = await conn.fetchrow(
            "SELECT * FROM users WHERE id = $1", user_id
        )
    return dict(user)

5.3 异步 HTTP 请求(以 httpx 为例)

from fastapi import FastAPI
import httpx

app = FastAPI()

@app.get("/fetch-multiple")
async def fetch_multiple():
    urls = [
        "https://api.github.com/users/octocat",
        "https://api.github.com/users/torvalds",
        "https://api.github.com/users/gvanrossum",
    ]

    # 异步并发请求所有 URL
    async with httpx.AsyncClient(timeout=10) as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)

    return [r.json() for r in responses]

6. 何时用同步代码?

6.1 用同步的场景

场景原因
CPU 密集型任务(计算、加密、压缩)asyncio 对 CPU-bound 无能为力,用 run_in_executor 丢到线程池
没有异步版本的库Pillownumpy(计算部分)
简单的同步操作文件读写(os.path、open)、数学计算
启动/初始化代码app = FastAPI() 这种只需跑一次

6.2 CPU 密集型任务的正确处理方式

from fastapi import FastAPI, BackgroundTasks
import asyncio

app = FastAPI()

# ❌ 错误:直接同步处理 CPU 密集型任务
@app.get("/heavy")
def heavy_task():
    total = 0
    for i in range(10**8):  # 模拟 CPU 密集计算
        total += i
    return {"result": total}

# ✅ 正确:用 run_in_executor 放到线程池
@app.get("/heavy")
async def heavy_task():
    def compute():
        total = 0
        for i in range(10**8):
            total += i
        return total

    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, compute)
    return {"result": result}

# ✅ 推荐:用 BackgroundTasks 异步执行
@app.get("/heavy", response_model=dict)
async def heavy_task(background_tasks: BackgroundTasks):
    def compute():
        total = 0
        for i in range(10**8):
            total += i
        return total

    background_tasks.add_task(compute)
    return {"message": "任务已在后台执行"}

6.3 同步库的异步包装

import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

def blocking_library_func(data):
    # 某个同步库
    import json
    return json.dumps(data)

async def async_wrapper(data):
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        executor, blocking_library_func, data
    )
    return result

7. 常见陷阱与避坑指南

陷阱 1:忘记 await

# ❌ 错误
async def bad():
    data = some_async_function()  # 这返回的是协程对象,不会执行!
    return data

# ✅ 正确
async def good():
    data = await some_async_function()
    return data

陷阱 2:在循环中串行 await

# ❌ 错误:逐个等待,效率低
async def slow():
    for url in urls:
        result = await fetch(url)
        results.append(result)

# ✅ 正确:并发等待
async def fast():
    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)

陷阱 3:混淆 async 函数与普通函数

# ❌ 错误
@app.get("/")
def endpoint():
    data = await async_db_query()  # 普通函数不能 await
    return data

# ✅ 正确
@app.get("/")
async def endpoint():
    data = await async_db_query()
    return data

陷阱 4:用 time.sleep 而不是 asyncio.sleep

# ❌ 错误
async def bad_sleep():
    time.sleep(5)  # 阻塞整个事件循环 5 秒

# ✅ 正确
async def good_sleep():
    await asyncio.sleep(5)  # 让出线程,可处理其他请求

8. 实战:构建一个异步 API 服务

"""
完整示例:异步调用外部 API + 异步数据库查询
"""
from fastapi import FastAPI, HTTPException
import httpx
import asyncio
from typing import List

app = FastAPI()

# 模拟异步数据库查询
async def query_users() -> List[dict]:
    await asyncio.sleep(0.1)  # 模拟 DB 查询延迟
    return [
        {"id": 1, "name": "Alice"},
        {"id": 2, "name": "Bob"},
    ]

# 异步获取 GitHub 用户信息
async def fetch_github_user(username: str) -> dict:
    async with httpx.AsyncClient() as client:
        resp = await client.get(f"https://api.github.com/users/{username}")
        if resp.status_code == 404:
            raise HTTPException(status_code=404, detail="User not found")
        resp.raise_for_status()
        return resp.json()

@app.get("/users")
async def list_users():
    """并发获取用户列表和 GitHub 公开信息"""
    users = await query_users()

    # 并发获取每个用户的 GitHub 头像
    tasks = [
        fetch_github_user(u["name"].lower())
        for u in users
    ]
    github_data = await asyncio.gather(*tasks, return_exceptions=True)

    # 合并结果
    result = []
    for user, gh in zip(users, github_data):
        if isinstance(gh, Exception):
            user["avatar"] = None
        else:
            user["avatar"] = gh.get("avatar_url")
        result.append(user)

    return result

@app.get("/github/{username}")
async def get_github(username: str):
    """获取单个 GitHub 用户"""
    return await fetch_github_user(username)

9. 小结

概念核心要点
async def定义协程函数,返回协程对象
await等待可等待对象(协程/任务/Future),让出控制权
asyncio.sleep异步睡眠,不阻塞事件循环
asyncio.gather并发执行多个协程
run_in_executor将同步代码丢到线程池执行
FastAPI 路由I/O 密集用 async,CPU 密集用 sync + run_in_executor

💡 记住:在 FastAPI 中,异步路由可以并发处理大量 I/O 请求,性能远优于同步路由。但不要在异步函数中做 CPU 密集计算——那会让整个事件循环卡住。


🔗 扩展阅读