#FastAPI异步编程完全指南
📂 所属阶段:第二阶段 — 进阶黑科技(核心篇)
🔗 相关章节:FastAPI依赖注入系统 · FastAPI中间件应用
#目录
#什么是异步编程?
#同步vs异步:排队打饭的比喻 🍚
想象你去食堂打饭:
- 同步:你站在窗口等厨师炒完一盘菜,端走,再点下一道。厨师在炒菜时你只能干等。
- 异步:你把菜单交给厨师,然后去拿餐具、倒饮料,等菜好了服务员直接端到你桌上。你不需要傻站着等。
这就是同步与异步的核心区别:等待时是否可以切换去干别的事。
#为什么Web服务需要异步?
请求 A(查询数据库,耗时 200ms)
请求 B(简单计算,耗时 1ms)
请求 C(调用外部 API,耗时 1000ms)- 同步模式:按顺序处理,总耗时 = 200 + 1 + 1000 = 1201ms
- 异步模式:A 等待数据库时切换 B,B 完成切 C,C 等待时切回 A → 总耗时 ≈ 1000ms
在 I/O 密集型场景(数据库查询、HTTP 请求、文件读写),异步能让单线程处理大量并发请求,显著提升API性能。
#异步编程的优势
- 高并发处理能力:单个进程可以处理数千个并发连接
- 资源利用率高:避免线程创建和切换的开销
- 响应速度快:I/O等待期间可以处理其他请求
- 内存占用低:协程比线程占用更少内存
#async/await详解
#基础语法
import asyncio
# 定义一个协程函数(async def)
async def say_hello():
print("Hello!")
return "Hello from coroutine!"
# 运行协程的三种方式
# 方式一:asyncio.run()(推荐,主入口用)
asyncio.run(say_hello())
# 方式二:在已有事件循环中创建任务
async def main():
task = asyncio.create_task(say_hello()) # 创建任务
result = await task # 等待任务完成
print(f"Result: {result}")
asyncio.run(main())
# 方式三:直接 await(只能在 async 函数内)
async def main():
result = await say_hello()
print(f"Result: {result}")#await 在等什么?
await 关键字只能等待可等待对象(Awaitable),包括:
| 类型 | 示例 | 说明 |
|---|---|---|
| 协程(Coroutine) | await coro() | async def 返回的对象 |
| 任务(Task) | asyncio.create_task() | 调度好的协程 |
| Future | asyncio.Future() | 尚未完成的结果占位符 |
async def fetch_data():
await asyncio.sleep(0.1) # 模拟异步I/O
return {"data": "hello", "timestamp": "2024-01-15"}
async def main():
# await 一个协程
result = await fetch_data()
print(result)
asyncio.run(main())#asyncio.sleep vs time.sleep
import asyncio
import time
# ❌ 同步 sleep:阻塞整个线程,无法处理其他任务
def sync_task():
time.sleep(2) # 在这 2 秒内,整条线程都被卡住
print("Sync task done")
# ✅ 异步 sleep:让出控制权,线程可以处理其他协程
async def async_task():
await asyncio.sleep(2) # 这 2 秒内,线程可以去干别的事
print("Async task done")
# 体验差异
async def compare():
start = time.time()
# 顺序执行两个 async_task
await async_task()
await async_task()
print(f"顺序执行耗时: {time.time() - start:.2f}s") # ~4秒
start = time.time()
# 并发执行
await asyncio.gather(async_task(), async_task())
print(f"并发执行耗时: {time.time() - start:.2f}s") # ~2秒
asyncio.run(compare())⚠️
asyncio.sleep()让出 GIL,但time.sleep()是真正阻塞线程。在 FastAPI 中使用time.sleep()会阻塞整个事件循环!
#协程与任务的区别
import asyncio
async def my_coroutine():
print("Coroutine started")
await asyncio.sleep(1)
print("Coroutine finished")
return "Result"
async def demonstrate_difference():
# 协程对象 - 只是定义,不会执行
coro = my_coroutine()
print(f"Type: {type(coro)}") # <class 'coroutine'>
# 任务对象 - 调度执行的协程
task = asyncio.create_task(my_coroutine())
print(f"Type: {type(task)}") # <class 'Task'>
# 等待任务完成
result = await task
print(f"Result: {result}")
asyncio.run(demonstrate_difference())#事件循环原理
#什么是事件循环?
事件循环是异步的"调度中心",它的工作流程:
┌─────────────────────────────────────┐
│ 事件循环(单线程) │
├─────────────────────────────────────┤
│ 1. 检查 IO 事件(网络/文件/定时器) │
│ 2. 收集就绪的任务,执行它们 │
│ 3. 遇到 await 就挂起,切换到下一个任务 │
│ 4. 重复...直到全部完成 │
└─────────────────────────────────────┘Python 异步模型基于 Generator,协程本质上是可暂停、恢复的生成器函数。
#事件循环生命周期
import asyncio
async def task(name, sec):
print(f"[{name}] 开始")
await asyncio.sleep(sec)
print(f"[{name}] 完成")
async def main():
# 创建任务(还未执行)
t1 = asyncio.create_task(task("A", 1))
t2 = asyncio.create_task(task("B", 0.5))
t3 = asyncio.create_task(task("C", 0.8))
# 等待所有任务完成
await asyncio.gather(t1, t2, t3)
print("全部完成")
asyncio.run(main())
# 输出顺序:
# [B] 开始
# [A] 开始
# [C] 开始
# [B] 完成
# [C] 完成
# [A] 完成
# 全部完成#事件循环的高级用法
import asyncio
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 设置循环关闭时的回调
def on_close():
print("Loop is closing...")
loop.call_soon(on_close)
# 创建任务并监控状态
task = asyncio.create_task(asyncio.sleep(1))
# 检查任务是否完成
print(f"Task done: {task.done()}")
await task
print(f"Task done: {task.done()}")
print(f"Task result: {task.result()}")
asyncio.run(main())#异步函数调用规则
#四条黄金法则
┌──────────────────────────────────────────────────────┐
│ 异步调用规则速查表 │
├──────────────┬───────────────────┬────────────────────┤
│ 调用方 │ 被调函数 │ 正确写法 │
├──────────────┼───────────────────┼────────────────────┤
│ sync 函数 │ async 函数 │ ❌ 不能直接调用 │
│ sync 函数 │ sync 函数 │ ✅ 直接调用 │
│ async 函数 │ async 函数 │ ✅ await 调用 │
│ async 函数 │ sync 函数 │ ⚠️ 可以,但不推荐 │
└──────────────┴───────────────────┴────────────────────┘#同步函数中调用异步函数
# ❌ 错误:同步函数中直接调用协程
def bad_example():
result = fetch_data() # 返回一个协程对象,不会执行!
print(result) # 打印的是 <coroutine object...>
# ✅ 正确方式一:包装为新事件循环(不推荐,代价大)
def call_async():
async def inner():
return await fetch_data()
result = asyncio.run(inner())
print(result)
return result
# ✅ 正确方式二:如果在异步上下文中,同步调用需要用 run_in_executor
async def main():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, sync_function)
print(result)#异步函数中调用同步函数
import time
# 同步函数示例
def blocking_function():
time.sleep(1) # 模拟阻塞操作
return "Blocking result"
# ✅ 可以,但不推荐。如果同步函数耗时,会阻塞事件循环
async def call_sync():
result = blocking_function() # 如果这个执行很久,整个事件循环会卡住
return result
# ✅ 推荐:耗时同步操作丢到线程池
async def call_sync_properly():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, blocking_function)
return result
# 或者用 to_thread(Python 3.9+,更简洁)
async def call_sync_simple():
result = await asyncio.to_thread(blocking_function)
return result#FastAPI中的异步使用
#FastAPI的异步路由
from fastapi import FastAPI
import asyncio
app = FastAPI()
# ✅ 异步路由:处理 I/O 密集型任务
@app.get("/async-data")
async def get_async_data():
# 模拟异步 I/O(如数据库查询、HTTP 请求)
await asyncio.sleep(1)
return {"source": "async", "data": [1, 2, 3], "timestamp": "2024-01-15"}
# ✅ 同步路由:处理 CPU 密集型任务
@app.get("/sync-data")
def get_sync_data():
# 计算密集型任务,不需要 await
result = sum(range(10**7))
return {"source": "sync", "result": result}#异步数据库操作(以 asyncpg 为例)
from fastapi import FastAPI
import asyncpg
import asyncio
app = FastAPI()
pool = None
@app.on_event("startup")
async def startup():
global pool
pool = await asyncpg.create_pool(
host="localhost",
database="mydb",
user="admin",
password="secret",
min_size=5,
max_size=20
)
@app.on_event("shutdown")
async def shutdown():
await pool.close()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with pool.acquire() as conn:
# 异步查询,不会阻塞事件循环
user = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
return dict(user)#异步HTTP请求(以httpx为例)
from fastapi import FastAPI
import httpx
app = FastAPI()
@app.get("/fetch-multiple")
async def fetch_multiple():
urls = [
"https://api.github.com/users/octocat",
"https://api.github.com/users/torvalds",
"https://api.github.com/users/gvanrossum",
]
# 异步并发请求所有 URL
async with httpx.AsyncClient(timeout=10) as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [r.json() for r in responses]#FastAPI依赖注入中的异步
from fastapi import Depends
async def get_current_user():
# 模拟异步用户认证
await asyncio.sleep(0.1)
return {"username": "john_doe", "id": 123}
@app.get("/protected")
async def protected_route(current_user: dict = Depends(get_current_user)):
return current_user
# 异步依赖的缓存
from functools import lru_cache
@lru_cache(maxsize=128)
def cached_sync_operation(param: str):
# 缓存同步操作
return f"Result for {param}"
async def async_cached_dependency(param: str):
# 在异步函数中使用缓存的同步操作
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, cached_sync_operation, param)#何时使用同步代码
#用同步的场景
| 场景 | 原因 |
|---|---|
| CPU 密集型任务(计算、加密、压缩) | asyncio 对 CPU-bound 无能为力,用 run_in_executor 丢到线程池 |
| 没有异步版本的库 | 如 Pillow、numpy(计算部分) |
| 简单的同步操作 | 文件读写(os.path、open)、数学计算 |
| 启动/初始化代码 | app = FastAPI() 这种只需跑一次 |
#CPU密集型任务的正确处理方式
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
# ❌ 错误:直接同步处理 CPU 密集型任务
@app.get("/heavy")
def heavy_task():
total = 0
for i in range(10**8): # 模拟 CPU 密集计算
total += i
return {"result": total}
# ✅ 正确:用 run_in_executor 放到线程池
@app.get("/heavy")
async def heavy_task():
def compute():
total = 0
for i in range(10**8):
total += i
return total
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, compute)
return {"result": result}
# ✅ 推荐:用 BackgroundTasks 异步执行
@app.get("/heavy", response_model=dict)
async def heavy_task(background_tasks: BackgroundTasks):
def compute():
total = 0
for i in range(10**8):
total += i
return total
background_tasks.add_task(compute)
return {"message": "任务已在后台执行"}#同步库的异步包装
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
def blocking_library_func(data):
# 某个同步库
import json
return json.dumps(data, ensure_ascii=False)
async def async_wrapper(data):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor, blocking_library_func, data
)
return result
# 使用示例
async def use_blocking_lib():
data = {"name": "张三", "age": 25}
result = await async_wrapper(data)
print(result)
return result#常见陷阱与避坑指南
#陷阱1:忘记await
# ❌ 错误
async def bad():
data = some_async_function() # 这返回的是协程对象,不会执行!
return data
# ✅ 正确
async def good():
data = await some_async_function()
return data#陷阱2:在循环中串行await
import asyncio
async def fetch(url):
await asyncio.sleep(0.1) # 模拟请求
return f"Data from {url}"
urls = ["url1", "url2", "url3"]
# ❌ 错误:逐个等待,效率低
async def slow():
results = []
for url in urls:
result = await fetch(url) # 串行等待
results.append(result)
return results
# ✅ 正确:并发等待
async def fast():
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks) # 并发执行
return results
# ✅ 更灵活的处理方式
async def flexible():
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理可能的异常
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Error fetching {urls[i]}: {result}")
else:
print(f"Success: {result}")
return results#陷阱3:混淆async函数与普通函数
from fastapi import FastAPI
app = FastAPI()
# ❌ 错误
@app.get("/bad")
def endpoint():
data = await async_db_query() # 普通函数不能 await
return data
# ✅ 正确
@app.get("/good")
async def endpoint():
data = await async_db_query()
return data#陷阱4:用time.sleep而不是asyncio.sleep
import time
# ❌ 错误
async def bad_sleep():
time.sleep(5) # 阻塞整个事件循环 5 秒
# ✅ 正确
async def good_sleep():
await asyncio.sleep(5) # 让出线程,可处理其他请求#陷阱5:在FastAPI中阻塞事件循环
# ❌ 错误:在FastAPI异步路由中使用阻塞操作
@app.get("/blocking-operation")
async def blocking_endpoint():
time.sleep(10) # 这会阻塞整个服务器10秒!
return {"message": "This took 10 seconds and blocked everything!"}
# ✅ 正确:将阻塞操作放到线程池
@app.get("/non-blocking-operation")
async def non_blocking_endpoint():
def blocking_op():
time.sleep(10) # 在线程池中执行
return "Done"
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, blocking_op)
return {"message": result}#实战:构建异步API服务
"""
完整示例:异步调用外部 API + 异步数据库查询
"""
from fastapi import FastAPI, HTTPException
import httpx
import asyncio
from typing import List, Dict, Any
import time
app = FastAPI()
# 模拟异步数据库查询
async def query_users() -> List[Dict[str, Any]]:
await asyncio.sleep(0.1) # 模拟 DB 查询延迟
return [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"},
]
# 异步获取 GitHub 用户信息
async def fetch_github_user(username: str) -> Dict[str, Any]:
async with httpx.AsyncClient() as client:
resp = await client.get(f"https://api.github.com/users/{username}")
if resp.status_code == 404:
raise HTTPException(status_code=404, detail="User not found")
resp.raise_for_status()
return resp.json()
@app.get("/users")
async def list_users():
"""并发获取用户列表和 GitHub 公开信息"""
start_time = time.time()
# 并发获取用户数据和GitHub信息
users_task = query_users()
github_tasks = [
fetch_github_user(u["name"].lower())
for u in [{"name": "octocat"}, {"name": "torvalds"}] # 示例用户
]
# 等待所有任务完成
users, *github_data = await asyncio.gather(users_task, *github_tasks)
# 合并结果
result = []
for user in users:
result.append(user)
# 添加GitHub用户信息
for gh_user in github_data:
result.append({
"id": gh_user["id"],
"name": gh_user["name"],
"github": True,
"avatar": gh_user.get("avatar_url")
})
processing_time = time.time() - start_time
return {
"users": result,
"processing_time": f"{processing_time:.2f}s",
"total_users": len(result)
}
@app.get("/github/{username}")
async def get_github(username: str):
"""获取单个 GitHub 用户"""
return await fetch_github_user(username)
# 高级异步模式:流式响应
@app.get("/stream-data")
async def stream_data():
"""演示流式响应"""
async def generate_data():
for i in range(10):
await asyncio.sleep(0.5) # 模拟数据生成延迟
yield f"data: {i}\n\n"
return StreamingResponse(generate_data(), media_type="text/event-stream")
from fastapi.responses import StreamingResponse
# 异步WebSocket处理
@app.websocket("/ws")
async def websocket_endpoint(websocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
# 异步处理数据
processed_data = await process_websocket_data(data)
await websocket.send_text(f"Processed: {processed_data}")
except Exception as e:
print(f"WebSocket error: {e}")
finally:
await websocket.close()
async def process_websocket_data(data: str) -> str:
"""异步处理WebSocket数据"""
await asyncio.sleep(0.1) # 模拟处理时间
return data.upper()#性能优化建议
#1. 使用连接池
# 数据库连接池
import asyncpg
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_db_connection():
conn = await asyncpg.connect(
host="localhost",
database="mydb",
user="user",
password="password"
)
try:
yield conn
finally:
await conn.close()#2. 适当使用缓存
import asyncio
from functools import lru_cache
@lru_cache(maxsize=128)
def expensive_calculation(n: int) -> int:
"""昂贵的计算,使用缓存避免重复计算"""
result = 0
for i in range(n):
result += i * i
return result
async def async_calculation(n: int):
"""在异步环境中使用缓存的同步计算"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, expensive_calculation, n)#3. 避免不必要的同步
# ❌ 不必要的同步
@app.get("/bad-sync")
async def bad_sync():
# 这些操作其实可以并发
data1 = await fetch_data1()
data2 = await fetch_data2()
data3 = await fetch_data3()
return {"data1": data1, "data2": data2, "data3": data3}
# ✅ 并发处理
@app.get("/good-async")
async def good_async():
# 并发获取所有数据
data1, data2, data3 = await asyncio.gather(
fetch_data1(),
fetch_data2(),
fetch_data3()
)
return {"data1": data1, "data2": data2, "data3": data3}#相关教程
#总结
| 概念 | 核心要点 |
|---|---|
async def | 定义协程函数,返回协程对象 |
await | 等待可等待对象(协程/任务/Future),让出控制权 |
asyncio.sleep | 异步睡眠,不阻塞事件循环 |
asyncio.gather | 并发执行多个协程 |
run_in_executor | 将同步代码丢到线程池执行 |
| FastAPI路由 | I/O密集用async,CPU密集用sync + run_in_executor |
💡 记住:在 FastAPI 中,异步路由可以并发处理大量 I/O 请求,性能远优于同步路由。但不要在异步函数中做 CPU 密集计算——那会让整个事件循环卡住。
FastAPI的异步特性使其成为构建高性能Web API的理想选择。正确理解和运用async/await机制,可以让您的API在高并发场景下表现出色。
#1. 什么是异步编程?
#1.1 同步 vs 异步:排队打饭的比喻 🍚
想象你去食堂打饭:
- 同步:你站在窗口等厨师炒完一盘菜,端走,再点下一道。厨师在炒菜时你只能干等。
- 异步:你把菜单交给厨师,然后去拿餐具、倒饮料,等菜好了服务员直接端到你桌上。你不需要傻站着等。
这就是同步与异步的核心区别:等待时是否可以切换去干别的事。
#1.2 为什么 Web 服务需要异步?
请求 A(查询数据库,耗时 200ms)
请求 B(简单计算,耗时 1ms)
请求 C(调用外部 API,耗时 1000ms)- 同步模式:按顺序处理,总耗时 = 200 + 1 + 1000 = 1201ms
- 异步模式:A 等待数据库时切换 B,B 完成切 C,C 等待时切回 A → 总耗时 ≈ 1000ms
在 I/O 密集型场景(数据库查询、HTTP 请求、文件读写),异步能让单线程处理大量并发请求。
#2. async / await 详解
#2.1 基础语法
import asyncio
# 定义一个协程函数(async def)
async def say_hello():
print("Hello!")
# 运行协程的三种方式
# 方式一:asyncio.run()(推荐,主入口用)
asyncio.run(say_hello())
# 方式二:在已有事件循环中创建任务
async def main():
task = asyncio.create_task(say_hello()) # 创建任务
await task # 等待任务完成
asyncio.run(main())
# 方式三:直接 await(只能在 async 函数内)
async def main():
await say_hello()#2.2 await 在等什么?
await 关键字只能等待可等待对象(Awaitable),包括:
| 类型 | 示例 | 说明 |
|---|---|---|
| 协程(Coroutine) | await coro() | async def 返回的对象 |
| 任务(Task) | asyncio.create_task() | 调度好的协程 |
| Future | asyncio.Future() | 尚未完成的结果占位符 |
async def fetch_data():
return {"data": "hello"}
async def main():
# await 一个协程
result = await fetch_data()
print(result)
asyncio.run(main())#2.3 asyncio.sleep vs time.sleep
import asyncio
import time
# ❌ 同步 sleep:阻塞整个线程,无法处理其他任务
def sync_task():
time.sleep(2) # 在这 2 秒内,整条线程都被卡住
print("done")
# ✅ 异步 sleep:让出控制权,线程可以处理其他协程
async def async_task():
await asyncio.sleep(2) # 这 2 秒内,线程可以去干别的事
print("done")
# 体验差异
async def compare():
start = time.time()
# 顺序执行两个 async_task
await async_task()
await async_task()
print(f"顺序执行耗时: {time.time() - start:.2f}s") # ~4秒
start = time.time()
# 并发执行
await asyncio.gather(async_task(), async_task())
print(f"并发执行耗时: {time.time() - start:.2f}s") # ~2秒
asyncio.run(compare())⚠️
asyncio.sleep()让出 GIL,但time.sleep()是真正阻塞线程。在 FastAPI 中使用time.sleep()会阻塞整个事件循环!
#3. 事件循环(Event Loop)原理
#3.1 什么是事件循环?
事件循环是异步的"调度中心",它的工作流程:
┌─────────────────────────────────────┐
│ 事件循环(单线程) │
├─────────────────────────────────────┤
│ 1. 检查 IO 事件(网络/文件/定时器) │
│ 2. 收集就绪的任务,执行它们 │
│ 3. 遇到 await 就挂起,切换到下一个任务 │
│ 4. 重复...直到全部完成 │
└─────────────────────────────────────┘Python 异步模型基于 Generator,协程本质上是可暂停、恢复的生成器函数。
#3.2 事件循环生命周期
import asyncio
async def task(name, sec):
print(f"[{name}] 开始")
await asyncio.sleep(sec)
print(f"[{name}] 完成")
async def main():
# 创建任务(还未执行)
t1 = asyncio.create_task(task("A", 1))
t2 = asyncio.create_task(task("B", 0.5))
t3 = asyncio.create_task(task("C", 0.8))
# 等待所有任务完成
await asyncio.gather(t1, t2, t3)
print("全部完成")
asyncio.run(main())
# 输出顺序:
# [B] 开始
# [A] 开始
# [C] 开始
# [B] 完成
# [C] 完成
# [A] 完成
# 全部完成#4. 异步函数的调用规则
#4.1 四条黄金法则
┌──────────────────────────────────────────────────────┐
│ 异步调用规则速查表 │
├──────────────┬───────────────────┬────────────────────┤
│ 调用方 │ 被调函数 │ 正确写法 │
├──────────────┼───────────────────┼────────────────────┤
│ sync 函数 │ async 函数 │ ❌ 不能直接调用 │
│ sync 函数 │ sync 函数 │ ✅ 直接调用 │
│ async 函数 │ async 函数 │ ✅ await 调用 │
│ async 函数 │ sync 函数 │ ⚠️ 可以,但不推荐 │
└──────────────┴───────────────────┴────────────────────┘#4.2 同步函数中调用异步函数
# ❌ 错误:同步函数中直接调用协程
def bad_example():
result = fetch_data() # 返回一个协程对象,不会执行!
print(result) # 打印的是 <coroutine object...>
# ✅ 正确方式一:包装为新事件循环(不推荐,代价大)
def call_async():
result = asyncio.run(fetch_data())
print(result)
# ✅ 正确方式二:如果在异步上下文中,同步调用需要用 run_in_executor
async def main():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, sync_function)
print(result)#4.3 异步函数中调用同步函数
# ✅ 可以,但不推荐。如果同步函数耗时,会阻塞事件循环
async def call_sync():
result = blocking_function() # 如果这个执行很久,整个事件循环会卡住
return result
# ✅ 推荐:耗时同步操作丢到线程池
async def call_sync_properly():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, blocking_function)
return result
# 或者用 to_thread(Python 3.9+,更简洁)
async def call_sync_simple():
result = await asyncio.to_thread(blocking_function)
return result#5. 在 FastAPI 中使用 async
#5.1 FastAPI 的异步路由
from fastapi import FastAPI
import asyncio
app = FastAPI()
# ✅ 异步路由:处理 I/O 密集型任务
@app.get("/async-data")
async def get_async_data():
# 模拟异步 I/O(如数据库查询、HTTP 请求)
await asyncio.sleep(1)
return {"source": "async", "data": [1, 2, 3]}
# ✅ 同步路由:处理 CPU 密集型任务
@app.get("/sync-data")
def get_sync_data():
# 计算密集型任务,不需要 await
result = sum(range(10**7))
return {"source": "sync", "result": result}#5.2 异步数据库操作(以 asyncpg 为例)
from fastapi import FastAPI
import asyncpg
import asyncio
app = FastAPI()
pool = None
@app.on_event("startup")
async def startup():
global pool
pool = await asyncpg.create_pool(
host="localhost",
database="mydb",
user="admin",
password="secret",
min_size=5,
max_size=20
)
@app.on_event("shutdown")
async def shutdown():
await pool.close()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with pool.acquire() as conn:
# 异步查询,不会阻塞事件循环
user = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
return dict(user)#5.3 异步 HTTP 请求(以 httpx 为例)
from fastapi import FastAPI
import httpx
app = FastAPI()
@app.get("/fetch-multiple")
async def fetch_multiple():
urls = [
"https://api.github.com/users/octocat",
"https://api.github.com/users/torvalds",
"https://api.github.com/users/gvanrossum",
]
# 异步并发请求所有 URL
async with httpx.AsyncClient(timeout=10) as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [r.json() for r in responses]#6. 何时用同步代码?
#6.1 用同步的场景
| 场景 | 原因 |
|---|---|
| CPU 密集型任务(计算、加密、压缩) | asyncio 对 CPU-bound 无能为力,用 run_in_executor 丢到线程池 |
| 没有异步版本的库 | 如 Pillow、numpy(计算部分) |
| 简单的同步操作 | 文件读写(os.path、open)、数学计算 |
| 启动/初始化代码 | app = FastAPI() 这种只需跑一次 |
#6.2 CPU 密集型任务的正确处理方式
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
# ❌ 错误:直接同步处理 CPU 密集型任务
@app.get("/heavy")
def heavy_task():
total = 0
for i in range(10**8): # 模拟 CPU 密集计算
total += i
return {"result": total}
# ✅ 正确:用 run_in_executor 放到线程池
@app.get("/heavy")
async def heavy_task():
def compute():
total = 0
for i in range(10**8):
total += i
return total
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, compute)
return {"result": result}
# ✅ 推荐:用 BackgroundTasks 异步执行
@app.get("/heavy", response_model=dict)
async def heavy_task(background_tasks: BackgroundTasks):
def compute():
total = 0
for i in range(10**8):
total += i
return total
background_tasks.add_task(compute)
return {"message": "任务已在后台执行"}#6.3 同步库的异步包装
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
def blocking_library_func(data):
# 某个同步库
import json
return json.dumps(data)
async def async_wrapper(data):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor, blocking_library_func, data
)
return result#7. 常见陷阱与避坑指南
#陷阱 1:忘记 await
# ❌ 错误
async def bad():
data = some_async_function() # 这返回的是协程对象,不会执行!
return data
# ✅ 正确
async def good():
data = await some_async_function()
return data#陷阱 2:在循环中串行 await
# ❌ 错误:逐个等待,效率低
async def slow():
for url in urls:
result = await fetch(url)
results.append(result)
# ✅ 正确:并发等待
async def fast():
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks)#陷阱 3:混淆 async 函数与普通函数
# ❌ 错误
@app.get("/")
def endpoint():
data = await async_db_query() # 普通函数不能 await
return data
# ✅ 正确
@app.get("/")
async def endpoint():
data = await async_db_query()
return data#陷阱 4:用 time.sleep 而不是 asyncio.sleep
# ❌ 错误
async def bad_sleep():
time.sleep(5) # 阻塞整个事件循环 5 秒
# ✅ 正确
async def good_sleep():
await asyncio.sleep(5) # 让出线程,可处理其他请求#8. 实战:构建一个异步 API 服务
"""
完整示例:异步调用外部 API + 异步数据库查询
"""
from fastapi import FastAPI, HTTPException
import httpx
import asyncio
from typing import List
app = FastAPI()
# 模拟异步数据库查询
async def query_users() -> List[dict]:
await asyncio.sleep(0.1) # 模拟 DB 查询延迟
return [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
]
# 异步获取 GitHub 用户信息
async def fetch_github_user(username: str) -> dict:
async with httpx.AsyncClient() as client:
resp = await client.get(f"https://api.github.com/users/{username}")
if resp.status_code == 404:
raise HTTPException(status_code=404, detail="User not found")
resp.raise_for_status()
return resp.json()
@app.get("/users")
async def list_users():
"""并发获取用户列表和 GitHub 公开信息"""
users = await query_users()
# 并发获取每个用户的 GitHub 头像
tasks = [
fetch_github_user(u["name"].lower())
for u in users
]
github_data = await asyncio.gather(*tasks, return_exceptions=True)
# 合并结果
result = []
for user, gh in zip(users, github_data):
if isinstance(gh, Exception):
user["avatar"] = None
else:
user["avatar"] = gh.get("avatar_url")
result.append(user)
return result
@app.get("/github/{username}")
async def get_github(username: str):
"""获取单个 GitHub 用户"""
return await fetch_github_user(username)#9. 小结
| 概念 | 核心要点 |
|---|---|
async def | 定义协程函数,返回协程对象 |
await | 等待可等待对象(协程/任务/Future),让出控制权 |
asyncio.sleep | 异步睡眠,不阻塞事件循环 |
asyncio.gather | 并发执行多个协程 |
run_in_executor | 将同步代码丢到线程池执行 |
| FastAPI 路由 | I/O 密集用 async,CPU 密集用 sync + run_in_executor |
💡 记住:在 FastAPI 中,异步路由可以并发处理大量 I/O 请求,性能远优于同步路由。但不要在异步函数中做 CPU 密集计算——那会让整个事件循环卡住。
🔗 扩展阅读

