#异步编程深度解析:async 与 await 的原理,以及何时该用同步代码
📂 所属阶段:第二阶段 — 进阶黑科技(核心篇)
🔗 相关章节:FastAPI 依赖注入系统 · FastAPI 中间件应用
#1. 什么是异步编程?
#1.1 同步 vs 异步:排队打饭的比喻 🍚
想象你去食堂打饭:
- 同步:你站在窗口等厨师炒完一盘菜,端走,再点下一道。厨师在炒菜时你只能干等。
- 异步:你把菜单交给厨师,然后去拿餐具、倒饮料,等菜好了服务员直接端到你桌上。你不需要傻站着等。
这就是同步与异步的核心区别:等待时是否可以切换去干别的事。
#1.2 为什么 Web 服务需要异步?
请求 A(查询数据库,耗时 200ms)
请求 B(简单计算,耗时 1ms)
请求 C(调用外部 API,耗时 1000ms)- 同步模式:按顺序处理,总耗时 = 200 + 1 + 1000 = 1201ms
- 异步模式:A 等待数据库时切换 B,B 完成切 C,C 等待时切回 A → 总耗时 ≈ 1000ms
在 I/O 密集型场景(数据库查询、HTTP 请求、文件读写),异步能让单线程处理大量并发请求。
#2. async / await 详解
#2.1 基础语法
import asyncio
# 定义一个协程函数(async def)
async def say_hello():
print("Hello!")
# 运行协程的三种方式
# 方式一:asyncio.run()(推荐,主入口用)
asyncio.run(say_hello())
# 方式二:在已有事件循环中创建任务
async def main():
task = asyncio.create_task(say_hello()) # 创建任务
await task # 等待任务完成
asyncio.run(main())
# 方式三:直接 await(只能在 async 函数内)
async def main():
await say_hello()#2.2 await 在等什么?
await 关键字只能等待可等待对象(Awaitable),包括:
| 类型 | 示例 | 说明 |
|---|---|---|
| 协程(Coroutine) | await coro() | async def 返回的对象 |
| 任务(Task) | asyncio.create_task() | 调度好的协程 |
| Future | asyncio.Future() | 尚未完成的结果占位符 |
async def fetch_data():
return {"data": "hello"}
async def main():
# await 一个协程
result = await fetch_data()
print(result)
asyncio.run(main())#2.3 asyncio.sleep vs time.sleep
import asyncio
import time
# ❌ 同步 sleep:阻塞整个线程,无法处理其他任务
def sync_task():
time.sleep(2) # 在这 2 秒内,整条线程都被卡住
print("done")
# ✅ 异步 sleep:让出控制权,线程可以处理其他协程
async def async_task():
await asyncio.sleep(2) # 这 2 秒内,线程可以去干别的事
print("done")
# 体验差异
async def compare():
start = time.time()
# 顺序执行两个 async_task
await async_task()
await async_task()
print(f"顺序执行耗时: {time.time() - start:.2f}s") # ~4秒
start = time.time()
# 并发执行
await asyncio.gather(async_task(), async_task())
print(f"并发执行耗时: {time.time() - start:.2f}s") # ~2秒
asyncio.run(compare())⚠️
asyncio.sleep()让出 GIL,但time.sleep()是真正阻塞线程。在 FastAPI 中使用time.sleep()会阻塞整个事件循环!
#3. 事件循环(Event Loop)原理
#3.1 什么是事件循环?
事件循环是异步的"调度中心",它的工作流程:
┌─────────────────────────────────────┐
│ 事件循环(单线程) │
├─────────────────────────────────────┤
│ 1. 检查 IO 事件(网络/文件/定时器) │
│ 2. 收集就绪的任务,执行它们 │
│ 3. 遇到 await 就挂起,切换到下一个任务 │
│ 4. 重复...直到全部完成 │
└─────────────────────────────────────┘Python 异步模型基于 Generator,协程本质上是可暂停、恢复的生成器函数。
#3.2 事件循环生命周期
import asyncio
async def task(name, sec):
print(f"[{name}] 开始")
await asyncio.sleep(sec)
print(f"[{name}] 完成")
async def main():
# 创建任务(还未执行)
t1 = asyncio.create_task(task("A", 1))
t2 = asyncio.create_task(task("B", 0.5))
t3 = asyncio.create_task(task("C", 0.8))
# 等待所有任务完成
await asyncio.gather(t1, t2, t3)
print("全部完成")
asyncio.run(main())
# 输出顺序:
# [B] 开始
# [A] 开始
# [C] 开始
# [B] 完成
# [C] 完成
# [A] 完成
# 全部完成#4. 异步函数的调用规则
#4.1 四条黄金法则
┌──────────────────────────────────────────────────────┐
│ 异步调用规则速查表 │
├──────────────┬───────────────────┬────────────────────┤
│ 调用方 │ 被调函数 │ 正确写法 │
├──────────────┼───────────────────┼────────────────────┤
│ sync 函数 │ async 函数 │ ❌ 不能直接调用 │
│ sync 函数 │ sync 函数 │ ✅ 直接调用 │
│ async 函数 │ async 函数 │ ✅ await 调用 │
│ async 函数 │ sync 函数 │ ⚠️ 可以,但不推荐 │
└──────────────┴───────────────────┴────────────────────┘#4.2 同步函数中调用异步函数
# ❌ 错误:同步函数中直接调用协程
def bad_example():
result = fetch_data() # 返回一个协程对象,不会执行!
print(result) # 打印的是 <coroutine object...>
# ✅ 正确方式一:包装为新事件循环(不推荐,代价大)
def call_async():
result = asyncio.run(fetch_data())
print(result)
# ✅ 正确方式二:如果在异步上下文中,同步调用需要用 run_in_executor
async def main():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, sync_function)
print(result)#4.3 异步函数中调用同步函数
# ✅ 可以,但不推荐。如果同步函数耗时,会阻塞事件循环
async def call_sync():
result = blocking_function() # 如果这个执行很久,整个事件循环会卡住
return result
# ✅ 推荐:耗时同步操作丢到线程池
async def call_sync_properly():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, blocking_function)
return result
# 或者用 to_thread(Python 3.9+,更简洁)
async def call_sync_simple():
result = await asyncio.to_thread(blocking_function)
return result#5. 在 FastAPI 中使用 async
#5.1 FastAPI 的异步路由
from fastapi import FastAPI
import asyncio
app = FastAPI()
# ✅ 异步路由:处理 I/O 密集型任务
@app.get("/async-data")
async def get_async_data():
# 模拟异步 I/O(如数据库查询、HTTP 请求)
await asyncio.sleep(1)
return {"source": "async", "data": [1, 2, 3]}
# ✅ 同步路由:处理 CPU 密集型任务
@app.get("/sync-data")
def get_sync_data():
# 计算密集型任务,不需要 await
result = sum(range(10**7))
return {"source": "sync", "result": result}#5.2 异步数据库操作(以 asyncpg 为例)
from fastapi import FastAPI
import asyncpg
import asyncio
app = FastAPI()
pool = None
@app.on_event("startup")
async def startup():
global pool
pool = await asyncpg.create_pool(
host="localhost",
database="mydb",
user="admin",
password="secret",
min_size=5,
max_size=20
)
@app.on_event("shutdown")
async def shutdown():
await pool.close()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with pool.acquire() as conn:
# 异步查询,不会阻塞事件循环
user = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
return dict(user)#5.3 异步 HTTP 请求(以 httpx 为例)
from fastapi import FastAPI
import httpx
app = FastAPI()
@app.get("/fetch-multiple")
async def fetch_multiple():
urls = [
"https://api.github.com/users/octocat",
"https://api.github.com/users/torvalds",
"https://api.github.com/users/gvanrossum",
]
# 异步并发请求所有 URL
async with httpx.AsyncClient(timeout=10) as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [r.json() for r in responses]#6. 何时用同步代码?
#6.1 用同步的场景
| 场景 | 原因 |
|---|---|
| CPU 密集型任务(计算、加密、压缩) | asyncio 对 CPU-bound 无能为力,用 run_in_executor 丢到线程池 |
| 没有异步版本的库 | 如 Pillow、numpy(计算部分) |
| 简单的同步操作 | 文件读写(os.path、open)、数学计算 |
| 启动/初始化代码 | app = FastAPI() 这种只需跑一次 |
#6.2 CPU 密集型任务的正确处理方式
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
# ❌ 错误:直接同步处理 CPU 密集型任务
@app.get("/heavy")
def heavy_task():
total = 0
for i in range(10**8): # 模拟 CPU 密集计算
total += i
return {"result": total}
# ✅ 正确:用 run_in_executor 放到线程池
@app.get("/heavy")
async def heavy_task():
def compute():
total = 0
for i in range(10**8):
total += i
return total
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, compute)
return {"result": result}
# ✅ 推荐:用 BackgroundTasks 异步执行
@app.get("/heavy", response_model=dict)
async def heavy_task(background_tasks: BackgroundTasks):
def compute():
total = 0
for i in range(10**8):
total += i
return total
background_tasks.add_task(compute)
return {"message": "任务已在后台执行"}#6.3 同步库的异步包装
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
def blocking_library_func(data):
# 某个同步库
import json
return json.dumps(data)
async def async_wrapper(data):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor, blocking_library_func, data
)
return result#7. 常见陷阱与避坑指南
#陷阱 1:忘记 await
# ❌ 错误
async def bad():
data = some_async_function() # 这返回的是协程对象,不会执行!
return data
# ✅ 正确
async def good():
data = await some_async_function()
return data#陷阱 2:在循环中串行 await
# ❌ 错误:逐个等待,效率低
async def slow():
for url in urls:
result = await fetch(url)
results.append(result)
# ✅ 正确:并发等待
async def fast():
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks)#陷阱 3:混淆 async 函数与普通函数
# ❌ 错误
@app.get("/")
def endpoint():
data = await async_db_query() # 普通函数不能 await
return data
# ✅ 正确
@app.get("/")
async def endpoint():
data = await async_db_query()
return data#陷阱 4:用 time.sleep 而不是 asyncio.sleep
# ❌ 错误
async def bad_sleep():
time.sleep(5) # 阻塞整个事件循环 5 秒
# ✅ 正确
async def good_sleep():
await asyncio.sleep(5) # 让出线程,可处理其他请求#8. 实战:构建一个异步 API 服务
"""
完整示例:异步调用外部 API + 异步数据库查询
"""
from fastapi import FastAPI, HTTPException
import httpx
import asyncio
from typing import List
app = FastAPI()
# 模拟异步数据库查询
async def query_users() -> List[dict]:
await asyncio.sleep(0.1) # 模拟 DB 查询延迟
return [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
]
# 异步获取 GitHub 用户信息
async def fetch_github_user(username: str) -> dict:
async with httpx.AsyncClient() as client:
resp = await client.get(f"https://api.github.com/users/{username}")
if resp.status_code == 404:
raise HTTPException(status_code=404, detail="User not found")
resp.raise_for_status()
return resp.json()
@app.get("/users")
async def list_users():
"""并发获取用户列表和 GitHub 公开信息"""
users = await query_users()
# 并发获取每个用户的 GitHub 头像
tasks = [
fetch_github_user(u["name"].lower())
for u in users
]
github_data = await asyncio.gather(*tasks, return_exceptions=True)
# 合并结果
result = []
for user, gh in zip(users, github_data):
if isinstance(gh, Exception):
user["avatar"] = None
else:
user["avatar"] = gh.get("avatar_url")
result.append(user)
return result
@app.get("/github/{username}")
async def get_github(username: str):
"""获取单个 GitHub 用户"""
return await fetch_github_user(username)#9. 小结
| 概念 | 核心要点 |
|---|---|
async def | 定义协程函数,返回协程对象 |
await | 等待可等待对象(协程/任务/Future),让出控制权 |
asyncio.sleep | 异步睡眠,不阻塞事件循环 |
asyncio.gather | 并发执行多个协程 |
run_in_executor | 将同步代码丢到线程池执行 |
| FastAPI 路由 | I/O 密集用 async,CPU 密集用 sync + run_in_executor |
💡 记住:在 FastAPI 中,异步路由可以并发处理大量 I/O 请求,性能远优于同步路由。但不要在异步函数中做 CPU 密集计算——那会让整个事件循环卡住。
🔗 扩展阅读

