流式响应(StreamingResponse):模仿 ChatGPT 实现打字机式的 AI 输出效果

📂 所属阶段:第六阶段 — 2026 特色专题(AI 集成篇)
🔗 相关章节:WebSocket 实时通信 · 异步编程深度解析


1. 什么是流式响应?

1.1 对比:普通响应 vs 流式响应

普通响应:一次性返回完整结果(等 5 秒 → 显示全部)

传统:  [===========请求============|=====处理(5s)=====] → [=====返回全部数据=====]
用户:  [=====等待5秒=====] → [看到完整结果] 😴

流式响应:像打字机一样,一个字一个字出现(等1秒 → 开始显示 → 持续输出)

Streaming: [请求(1s)] → [逐字显示: 你→你好→Hello→...] → [看完效果] 🤩

1.2 典型应用场景

  • AI 对话(ChatGPT、Claude):一个字一个字蹦出来
  • 文件下载:大文件分块传输,边生成边下载
  • 日志流:实时推送服务器日志
  • 视频流:逐帧传输

2. StreamingResponse 基础

2.1 最简单的流式响应

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def generate_numbers():
    """生成器:逐个 yield 数据"""
    for i in range(1, 11):
        yield f"数据 {i}\n"
        await asyncio.sleep(0.5)  # 模拟耗时

@app.get("/stream")
async def stream_numbers():
    return StreamingResponse(
        generate_numbers(),
        media_type="text/plain",  # 或 "text/event-stream"(SSE)
    )

2.2 SSE(Server-Sent Events)

SSE 是浏览器原生支持的流式推送协议:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

async def sse_generator():
    """SSE 格式的流"""
    for i in range(5):
        data = {"index": i, "message": f"第 {i+1} 条消息"}
        # SSE 格式:event + data
        yield f"event: message\ndata: {json.dumps(data)}\n\n"
        await asyncio.sleep(1)

@app.get("/sse")
async def sse_endpoint():
    return StreamingResponse(
        sse_generator(),
        media_type="text/event-stream",
    )

@app.get("/sse-connected")
async def sse_connected():
    """带连接管理的 SSE"""
    async def event_stream():
        try:
            for i in range(100):
                yield f"data: 消息 {i}\n\n"
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            # 客户端断开连接时清理
            print("客户端断开了")
            raise

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # 禁用 Nginx 缓冲
        }
    )

3. AI 对话流式响应(核心场景)

3.1 接入 OpenAI 流式 API

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import httpx
import asyncio
import json

app = FastAPI()

async def openai_stream(user_message: str):
    """将 OpenAI 的流式响应转发给前端"""
    async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client:
        async with client.stream(
            "POST",
            "https://api.openai.com/v1/chat/completions",
            headers={
                "Authorization": f"Bearer {OPENAI_API_KEY}",
                "Content-Type": "application/json",
            },
            json={
                "model": "gpt-4o-mini",
                "messages": [{"role": "user", "content": user_message}],
                "stream": True,  # 关键:开启流式
            }
        ) as response:
            # 将 OpenAI 的流式 chunk 逐个转发给客户端
            async for chunk in response.aiter_bytes():
                if chunk:
                    yield chunk

@app.post("/chat")
async def chat(request: Request):
    body = await request.json()
    user_message = body.get("message", "你好")

    return StreamingResponse(
        openai_stream(user_message),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

3.2 前端 SSE 接收

// 纯 JavaScript 实现
async function chatStream(message) {
    const response = await fetch("/chat", {
        method: "POST",
        headers: {"Content-Type": "application/json"},
        body: JSON.stringify({message}),
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const text = decoder.decode(value);
        console.log("收到片段:", text);
        // 更新 UI,逐字显示
    }
}

3.3 模拟打字机效果

async def chat_with_typing(message: str):
    """模拟 AI 打字效果(实际项目中替换为真实 AI 调用)"""
    # 这里可以是 OpenAI、Claude、本地模型等
    ai_response = f"这是 AI 对「{message}」的回答..."  # 模拟回复

    for char in ai_response:
        yield char
        await asyncio.sleep(0.03)  # 每字延迟 30ms,打字机效果

@app.post("/chat-simulate")
async def chat_simulate(request: Request):
    body = await request.json()
    return StreamingResponse(
        chat_with_typing(body.get("message", "")),
        media_type="text/event-stream",
    )

4. 格式化流式响应

4.1 自定义 SSE 包装

async def stream_with_format(data_generator):
    """统一的 SSE 格式包装"""
    try:
        async for item in data_generator:
            # 统一转为 SSE 格式
            yield f"event: data\ndata: {json.dumps({'type': 'token', 'content': item})}\n\n"
    except Exception as e:
        yield f"event: error\ndata: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
    finally:
        yield f"event: done\ndata: {json.dumps({'type': 'done'})}\n\n"

@app.post("/chat-formatted")
async def chat_formatted(request: Request):
    body = await request.json()
    return StreamingResponse(
        stream_with_format(chat_with_typing(body.get("message", ""))),
        media_type="text/event-stream",
    )

5. 注意事项

⚠️ 生产环境注意事项:

1. Nginx 缓冲:默认会缓冲 SSE,导致无法实时显示
   → 配置:X-Accel-Buffering: no

2. 代理超时:
   → Nginx: proxy_read_timeout 配长一点
   → Gunicorn: timeout 调大

3. 异步取消:
   → 用 CancelledError 捕获客户端断开,做清理

4. Token 限制:
   → 流式不减少 Token 消耗,只改善用户体验

6. 小结

# 流式响应三步走

# 1. 定义生成器
async def data_stream():
    for item in items:
        yield item
        await asyncio.sleep(0.1)

# 2. 返回 StreamingResponse
@app.get("/stream")
async def stream():
    return StreamingResponse(
        data_stream(),
        media_type="text/event-stream",  # SSE 格式
        headers={"X-Accel-Buffering": "no"}
    )

# 3. 前端用 EventSource 接收
# new EventSource("/stream").onmessage = (e) => console.log(e.data)

💡 核心原理:流式响应的本质是 yield 分块返回。Python 的 async generator 配合 StreamingResponse,让 FastAPI 能优雅地实现实时推送。


🔗 扩展阅读