#流式响应(StreamingResponse):模仿 ChatGPT 实现打字机式的 AI 输出效果
📂 所属阶段:第六阶段 — 2026 特色专题(AI 集成篇)
🔗 相关章节:WebSocket 实时通信 · 异步编程深度解析
#1. 什么是流式响应?
#1.1 对比:普通响应 vs 流式响应
普通响应:一次性返回完整结果(等 5 秒 → 显示全部)
传统: [===========请求============|=====处理(5s)=====] → [=====返回全部数据=====]
用户: [=====等待5秒=====] → [看到完整结果] 😴
流式响应:像打字机一样,一个字一个字出现(等1秒 → 开始显示 → 持续输出)
Streaming: [请求(1s)] → [逐字显示: 你→你好→Hello→...] → [看完效果] 🤩#1.2 典型应用场景
- AI 对话(ChatGPT、Claude):一个字一个字蹦出来
- 文件下载:大文件分块传输,边生成边下载
- 日志流:实时推送服务器日志
- 视频流:逐帧传输
#2. StreamingResponse 基础
#2.1 最简单的流式响应
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def generate_numbers():
"""生成器:逐个 yield 数据"""
for i in range(1, 11):
yield f"数据 {i}\n"
await asyncio.sleep(0.5) # 模拟耗时
@app.get("/stream")
async def stream_numbers():
return StreamingResponse(
generate_numbers(),
media_type="text/plain", # 或 "text/event-stream"(SSE)
)#2.2 SSE(Server-Sent Events)
SSE 是浏览器原生支持的流式推送协议:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
async def sse_generator():
"""SSE 格式的流"""
for i in range(5):
data = {"index": i, "message": f"第 {i+1} 条消息"}
# SSE 格式:event + data
yield f"event: message\ndata: {json.dumps(data)}\n\n"
await asyncio.sleep(1)
@app.get("/sse")
async def sse_endpoint():
return StreamingResponse(
sse_generator(),
media_type="text/event-stream",
)
@app.get("/sse-connected")
async def sse_connected():
"""带连接管理的 SSE"""
async def event_stream():
try:
for i in range(100):
yield f"data: 消息 {i}\n\n"
await asyncio.sleep(1)
except asyncio.CancelledError:
# 客户端断开连接时清理
print("客户端断开了")
raise
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
}
)#3. AI 对话流式响应(核心场景)
#3.1 接入 OpenAI 流式 API
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import httpx
import asyncio
import json
app = FastAPI()
async def openai_stream(user_message: str):
"""将 OpenAI 的流式响应转发给前端"""
async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client:
async with client.stream(
"POST",
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": user_message}],
"stream": True, # 关键:开启流式
}
) as response:
# 将 OpenAI 的流式 chunk 逐个转发给客户端
async for chunk in response.aiter_bytes():
if chunk:
yield chunk
@app.post("/chat")
async def chat(request: Request):
body = await request.json()
user_message = body.get("message", "你好")
return StreamingResponse(
openai_stream(user_message),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)#3.2 前端 SSE 接收
// 纯 JavaScript 实现
async function chatStream(message) {
const response = await fetch("/chat", {
method: "POST",
headers: {"Content-Type": "application/json"},
body: JSON.stringify({message}),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
console.log("收到片段:", text);
// 更新 UI,逐字显示
}
}#3.3 模拟打字机效果
async def chat_with_typing(message: str):
"""模拟 AI 打字效果(实际项目中替换为真实 AI 调用)"""
# 这里可以是 OpenAI、Claude、本地模型等
ai_response = f"这是 AI 对「{message}」的回答..." # 模拟回复
for char in ai_response:
yield char
await asyncio.sleep(0.03) # 每字延迟 30ms,打字机效果
@app.post("/chat-simulate")
async def chat_simulate(request: Request):
body = await request.json()
return StreamingResponse(
chat_with_typing(body.get("message", "")),
media_type="text/event-stream",
)#4. 格式化流式响应
#4.1 自定义 SSE 包装
async def stream_with_format(data_generator):
"""统一的 SSE 格式包装"""
try:
async for item in data_generator:
# 统一转为 SSE 格式
yield f"event: data\ndata: {json.dumps({'type': 'token', 'content': item})}\n\n"
except Exception as e:
yield f"event: error\ndata: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
finally:
yield f"event: done\ndata: {json.dumps({'type': 'done'})}\n\n"
@app.post("/chat-formatted")
async def chat_formatted(request: Request):
body = await request.json()
return StreamingResponse(
stream_with_format(chat_with_typing(body.get("message", ""))),
media_type="text/event-stream",
)#5. 注意事项
⚠️ 生产环境注意事项:
1. Nginx 缓冲:默认会缓冲 SSE,导致无法实时显示
→ 配置:X-Accel-Buffering: no
2. 代理超时:
→ Nginx: proxy_read_timeout 配长一点
→ Gunicorn: timeout 调大
3. 异步取消:
→ 用 CancelledError 捕获客户端断开,做清理
4. Token 限制:
→ 流式不减少 Token 消耗,只改善用户体验#6. 小结
# 流式响应三步走
# 1. 定义生成器
async def data_stream():
for item in items:
yield item
await asyncio.sleep(0.1)
# 2. 返回 StreamingResponse
@app.get("/stream")
async def stream():
return StreamingResponse(
data_stream(),
media_type="text/event-stream", # SSE 格式
headers={"X-Accel-Buffering": "no"}
)
# 3. 前端用 EventSource 接收
# new EventSource("/stream").onmessage = (e) => console.log(e.data)💡 核心原理:流式响应的本质是
yield分块返回。Python 的async generator配合StreamingResponse,让 FastAPI 能优雅地实现实时推送。
🔗 扩展阅读

