FastAPIstreaming-response完全指南
📂 关联资源:websocket-realtime-communication · 异步编程深度解析
目录
流式响应是什么?
想象一下你在等待一个3小时的电影文件下载:传统做法是等服务器把整个文件准备好,再一次性扔给你。这会导致两个问题:
- 服务器内存可能被撑爆(比如4GB的大文件直接读进内存)
- 你必须傻等很久才能看到第一帧画面
而流式响应就像是边下载边播放:服务器每准备好一小块数据就立刻发送,客户端也能立刻消费这一小块。不需要等全量,也不需要囤积所有数据。
传统HTTP vs 流式响应
流式响应在 FastAPI 中主要靠 StreamingResponse 来实现,配合 Python 的异步生成器 (async generator),就可以像水龙头一样逐步“流出”数据。
选 SSE 还是 WebSocket?
很多同学会纠结:流式响应、SSE、WebSocket 到底怎么选?简单来说:
- StreamingResponse:最基础的流式输出,适合一次性流(比如下载文件、简单数字序列)。开发最简单。
- SSE (Server-Sent Events):建立在 StreamingResponse 之上,是一种标准化的单向推送协议。浏览器原生支持,自带自动重连、事件分类等功能,非常适合AI对话的令牌流、状态更新等。
- WebSocket:全双工双向通信,复杂度更高,适合需要客户端频繁给服务端发消息的场景,比如聊天、游戏、协同编辑。
本文重点讲解 StreamingResponse 及其最常用的上层包装——SSE。
StreamingResponse基础与优化
最简示例:数字流
我们先从一个最简单的流式响应开始,让服务端每隔0.5秒推送一个数字:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def num_stream():
"""异步生成器,逐步产出数据块"""
for i in range(1, 6):
yield f"收到第{i}个数据块\n"
await asyncio.sleep(0.5) # 模拟耗时操作,比如数据库查询、AI计算
@app.get("/simple-stream")
async def simple_stream():
return StreamingResponse(
num_stream(), # 传入异步生成器
media_type="text/plain", # 纯文本流,也可以是 application/json 等
headers={
"Cache-Control": "no-cache", # 禁止代理/浏览器缓存,保证实时性
"Connection": "keep-alive" # 保持长连接
}
)
访问 http://localhost:8000/simple-stream,你会发现浏览器不是一次性显示全部内容,而是每隔0.5秒多出一行,就像终端命令的输出一样。
优雅处理客户端断开
用户可能会在中途刷新页面或关闭标签页,这时候 FastAPI 会取消正在运行的异步任务,触发 asyncio.CancelledError。我们必须在生成器中捕获这个异常,以便释放数据库连接、关闭文件等资源。
async def safe_num_stream():
try:
for i in range(1, 6):
yield f"收到第{i}个数据块\n"
await asyncio.sleep(0.5)
except asyncio.CancelledError:
# 这里可以做一些清理工作,比如记录日志、释放锁
print("客户端主动断开,清理资源...")
# 注意:必须重新抛出异常,否则FastAPI无法正确完成清理
raise
关键点:捕获 CancelledError 后必须 raise 出去,这是 FastAPI 内部生命周期管理的约定,否则可能导致连接泄漏。
SSE服务器发送事件
SSE(Server-Sent Events)是在 StreamingResponse 基础上定义的一套格式规范,目的是让浏览器的 EventSource API 可以直接、优雅地接收服务端推送的消息。它的好处包括:
- 自动重连:网络断了重连后自动恢复,默认3秒重试一次。
- 事件分类:可以给消息打上事件名称,前端按不同事件类型分别处理。
- 轻量级:基于 HTTP,无需额外库,防火墙友好。
SSE 数据格式
SSE 通过纯文本流传递,有严格的格式要求:
- 每条消息以
data: 开头,后面跟具体内容(可以是纯文本或JSON)
- 消息结束标志是 两个连续的换行
\n\n
- 可选字段:
event: 定义事件类型,id: 设置消息编号(用于断点续传),retry: 设置重连间隔(毫秒)
来看一个完整的 SSE 端点示例:
import json
from datetime import datetime
async def sse_status_stream():
# 第一条消息建议发送重连间隔,告诉浏览器10秒后再重试
yield "retry: 10000\n\n"
for i in range(1, 11):
# 普通进度消息,不带事件类型
normal_data = {
"step": i,
"progress": i * 10,
"time": datetime.now().isoformat()
}
yield f"data: {json.dumps(normal_data)}\n\n"
# 每完成3步发送一个 checkpoint 事件
if i % 3 == 0:
status_data = {"state": "checkpoint_reached", "step": i}
# 格式: event: <事件名>\ndata: <json>\n\n
yield f"event: checkpoint\ndata: {json.dumps(status_data)}\n\n"
await asyncio.sleep(0.8)
@app.get("/sse-status")
async def sse_status():
return StreamingResponse(
sse_status_stream(),
media_type="text/event-stream", # ★ 必须是这个MIME类型,浏览器才能识别
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # 禁用Nginx缓冲(非常重要!)
}
)
⚠️ X-Accel-Buffering: no 这个响应头是关键,如果没有它,当你用 Nginx 做反向代理时,数据会被缓冲起来,前端可能好几秒才收到一批消息,完全失去实时性。
AI对话打字机效果
在 ChatGPT 这类 AI 对话产品中,回复是一字一字“打”出来的,这就是经典的打字机效果(Typewriter effect)。实现原理正是 SSE 流式输出:后端每生成一个令牌(token)就立即推送。
模拟AI回复
下面我们模拟一个简化的 AI 回答流程,将句子逐字符发送:
import json
from fastapi import Request
async def mock_ai_stream(user_msg: str):
# 模拟AI生成的完整回复
ai_reply = f"您的问题是「{user_msg}」,让我为您逐步分析:首先,我们要理清需求边界;其次,梳理实现路径;最后,优化细节。"
accumulated = ""
for char in ai_reply:
accumulated += char
chunk = {
"type": "token", # 消息类型
"content": char, # 当前新增的字符
"full": accumulated # 累积完整文本,方便前端直接渲染
}
yield f"data: {json.dumps(chunk)}\n\n"
await asyncio.sleep(0.02) # 模拟AI生成延迟(一般实际生产可能是几十毫秒)
@app.post("/mock-ai-chat")
async def mock_ai_chat(req: Request):
body = await req.json()
user_msg = body.get("msg", "")
return StreamingResponse(
mock_ai_stream(user_msg),
media_type="text/event-stream",
headers={"X-Accel-Buffering": "no"}
)
前端拿到每个 chunk 后,可以实时拼接到对话界面上,形成打字效果。通常也会增加一个 "[DONE]" 标记表示流结束,但本例为简化省略,真实场景中请参考 OpenAI 的响应格式。
文件流与日志实时推送
大文件分块下载
流式响应同样适用于文件下载。传统方式可能用 FileResponse 将整个文件读入内存再发送,但当文件达到 GB 级别时就会出问题。
利用 StreamingResponse 配合异步文件读取,我们可以每次读取一小块(如 8KB)发送,内存占用始终控制在块大小以内。
import aiofiles
from pathlib import Path
async def file_chunk_stream(file_path: str, chunk_size: int = 8192):
p = Path(file_path)
if not p.exists():
yield b"File not found"
return
async with aiofiles.open(p, "rb") as f:
while chunk := await f.read(chunk_size):
yield chunk # 二进制块直接 yield
@app.get("/download/{filename}")
async def download_file(filename: str):
local_path = f"./data/{filename}" # 生产环境务必做路径穿越检查!
return StreamingResponse(
file_chunk_stream(local_path),
media_type="application/octet-stream",
headers={
"Content-Disposition": f"attachment; filename={filename}"
}
)
实时日志推送
类似地,我们可以将应用产生的日志实时推送到前端仪表盘。例如后台有一个日志队列,生成器不断从中取出日志行并发送:
import asyncio
from collections import deque
log_queue = deque()
async def log_stream():
while True:
if log_queue:
line = log_queue.popleft()
yield f"data: {line}\n\n"
else:
await asyncio.sleep(0.1) # 避免空转消耗CPU
这种方式常用于监控系统、CI/CD流水线日志展示。
生产环境关键配置
Nginx 反向代理
大多数 FastAPI 应用前面都会挂一层 Nginx。如果 Nginx 对 SSE 或流式接口开启了缓冲,你的数据就会被“堵”住,前端要等到缓冲区满了才能看到内容,完全丧失流式特性。因此必须对相关路径禁用缓冲:
location /stream/ {
proxy_pass http://uvicorn_backend;
proxy_http_version 1.1;
# 核心:关闭所有缓冲
proxy_buffering off;
proxy_cache off;
proxy_set_header X-Accel-Buffering no;
# 长连接超时设置(根据业务调整)
proxy_read_timeout 300s;
proxy_send_timeout 300s;
keepalive_timeout 300s;
}
注意,如果你的流式接口路径不是 /stream/,记得相应修改 location 块,或者通过响应头 X-Accel-Buffering: no 统一关闭(两者选其一即可)。
Uvicorn / Gunicorn 参数
建议使用异步 worker 来发挥 FastAPI 的异步特性,同时调大超时和并发限制:
# 使用 uvicorn 直接运行(适合开发、小规模生产)
uvicorn main:app --workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--timeout-keep-alive 300 \
--limit-concurrency 100
--workers 4:生产环境一般设置为 CPU 核心数
--worker-class uvicorn.workers.UvicornWorker:Gunicorn 管理时指定异步 worker
--timeout-keep-alive 300:保持连接最大空闲时间,适配长连接场景
--limit-concurrency 100:限制同时处理的请求数,防止资源耗尽
前端极简集成方案
前端接收 SSE 消息最方便的方法就是使用浏览器原生的 EventSource 对象,无需引入任何第三方库。
<!DOCTYPE html>
<html>
<body>
<div id="output" style="font-family: monospace; white-space: pre-wrap;"></div>
<script>
// 连接到 SSE 端点
const es = new EventSource("/sse-status");
const output = document.getElementById("output");
// 1. 接收默认事件(没有 event: 前缀的消息)
es.onmessage = (e) => {
const data = JSON.parse(e.data);
output.innerHTML += `默认事件:进度 ${data.progress}%<br>`;
};
// 2. 接收自定义 checkpoint 事件
es.addEventListener("checkpoint", (e) => {
const data = JSON.parse(e.data);
output.innerHTML += `<span style="color:blue;">✅ 检查点 ${data.step} 达成!</span><br>`;
});
// 3. 错误处理(连接中断或异常时触发,会自动重连)
es.onerror = () => console.log("连接出错,尝试自动重连...");
// 4. 如果需要主动关闭连接
// setTimeout(() => es.close(), 10000);
</script>
</body>
</html>
EventSource 内置的重连机制非常实用:网络抖动或服务端重启后,它会自动按一定间隔重新连接。你还可以通过服务端发送 retry: 字段来调整重连间隔。
📝 总结:FastAPI 的 StreamingResponse 是实现流式响应的基石,配合 SSE 标准协议,可以轻松构建出体验丝滑的 AI 对话、实时日志面板和大文件下载服务。
关键在于:异步生成器 + 合适的 MIME 类型 + 禁用各级缓冲。掌握这三个要点,你就能在生产环境中稳定运行流式应用。