FastAPIstreaming-response完全指南

📂 关联资源:websocket-realtime-communication · 异步编程深度解析

目录


流式响应是什么?

想象一下你在等待一个3小时的电影文件下载:传统做法是等服务器把整个文件准备好,再一次性扔给你。这会导致两个问题:

  • 服务器内存可能被撑爆(比如4GB的大文件直接读进内存)
  • 你必须傻等很久才能看到第一帧画面

流式响应就像是边下载边播放:服务器每准备好一小块数据就立刻发送,客户端也能立刻消费这一小块。不需要等全量,也不需要囤积所有数据。

传统HTTP vs 流式响应

对比维度传统全量响应流式响应
返回方式服务器处理完全部数据后一次性返回边处理边推送,客户端即时收到分块数据
内存占用整个响应体必须完全在内存中只需缓存当前处理的小数据块
首字节时间慢,用户一直等待快,几乎立即收到第一块内容
适合场景简单JSON接口、小页面渲染AI生成文本、大文件下载、实时日志、进度条

流式响应在 FastAPI 中主要靠 StreamingResponse 来实现,配合 Python 的异步生成器 (async generator),就可以像水龙头一样逐步“流出”数据。

选 SSE 还是 WebSocket?

很多同学会纠结:流式响应、SSE、WebSocket 到底怎么选?简单来说:

  • StreamingResponse:最基础的流式输出,适合一次性流(比如下载文件、简单数字序列)。开发最简单。
  • SSE (Server-Sent Events):建立在 StreamingResponse 之上,是一种标准化的单向推送协议。浏览器原生支持,自带自动重连、事件分类等功能,非常适合AI对话的令牌流、状态更新等。
  • WebSocket:全双工双向通信,复杂度更高,适合需要客户端频繁给服务端发消息的场景,比如聊天、游戏、协同编辑。

本文重点讲解 StreamingResponse 及其最常用的上层包装——SSE。


StreamingResponse基础与优化

最简示例:数字流

我们先从一个最简单的流式响应开始,让服务端每隔0.5秒推送一个数字:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def num_stream():
    """异步生成器,逐步产出数据块"""
    for i in range(1, 6):
        yield f"收到第{i}个数据块\n"
        await asyncio.sleep(0.5)  # 模拟耗时操作,比如数据库查询、AI计算

@app.get("/simple-stream")
async def simple_stream():
    return StreamingResponse(
        num_stream(),               # 传入异步生成器
        media_type="text/plain",    # 纯文本流,也可以是 application/json 等
        headers={
            "Cache-Control": "no-cache",  # 禁止代理/浏览器缓存,保证实时性
            "Connection": "keep-alive"    # 保持长连接
        }
    )

访问 http://localhost:8000/simple-stream,你会发现浏览器不是一次性显示全部内容,而是每隔0.5秒多出一行,就像终端命令的输出一样。

优雅处理客户端断开

用户可能会在中途刷新页面或关闭标签页,这时候 FastAPI 会取消正在运行的异步任务,触发 asyncio.CancelledError。我们必须在生成器中捕获这个异常,以便释放数据库连接、关闭文件等资源。

async def safe_num_stream():
    try:
        for i in range(1, 6):
            yield f"收到第{i}个数据块\n"
            await asyncio.sleep(0.5)
    except asyncio.CancelledError:
        # 这里可以做一些清理工作,比如记录日志、释放锁
        print("客户端主动断开,清理资源...")
        # 注意:必须重新抛出异常,否则FastAPI无法正确完成清理
        raise

关键点:捕获 CancelledError 后必须 raise 出去,这是 FastAPI 内部生命周期管理的约定,否则可能导致连接泄漏。


SSE服务器发送事件

SSE(Server-Sent Events)是在 StreamingResponse 基础上定义的一套格式规范,目的是让浏览器的 EventSource API 可以直接、优雅地接收服务端推送的消息。它的好处包括:

  • 自动重连:网络断了重连后自动恢复,默认3秒重试一次。
  • 事件分类:可以给消息打上事件名称,前端按不同事件类型分别处理。
  • 轻量级:基于 HTTP,无需额外库,防火墙友好。

SSE 数据格式

SSE 通过纯文本流传递,有严格的格式要求:

  • 每条消息以 data: 开头,后面跟具体内容(可以是纯文本或JSON)
  • 消息结束标志是 两个连续的换行 \n\n
  • 可选字段:event: 定义事件类型,id: 设置消息编号(用于断点续传),retry: 设置重连间隔(毫秒)

来看一个完整的 SSE 端点示例:

import json
from datetime import datetime

async def sse_status_stream():
    # 第一条消息建议发送重连间隔,告诉浏览器10秒后再重试
    yield "retry: 10000\n\n"
    
    for i in range(1, 11):
        # 普通进度消息,不带事件类型
        normal_data = {
            "step": i,
            "progress": i * 10,
            "time": datetime.now().isoformat()
        }
        yield f"data: {json.dumps(normal_data)}\n\n"
        
        # 每完成3步发送一个 checkpoint 事件
        if i % 3 == 0:
            status_data = {"state": "checkpoint_reached", "step": i}
            # 格式: event: <事件名>\ndata: <json>\n\n
            yield f"event: checkpoint\ndata: {json.dumps(status_data)}\n\n"
        
        await asyncio.sleep(0.8)

@app.get("/sse-status")
async def sse_status():
    return StreamingResponse(
        sse_status_stream(),
        media_type="text/event-stream",   # ★ 必须是这个MIME类型,浏览器才能识别
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"     # 禁用Nginx缓冲(非常重要!)
        }
    )

⚠️ X-Accel-Buffering: no 这个响应头是关键,如果没有它,当你用 Nginx 做反向代理时,数据会被缓冲起来,前端可能好几秒才收到一批消息,完全失去实时性。


AI对话打字机效果

在 ChatGPT 这类 AI 对话产品中,回复是一字一字“打”出来的,这就是经典的打字机效果(Typewriter effect)。实现原理正是 SSE 流式输出:后端每生成一个令牌(token)就立即推送。

模拟AI回复

下面我们模拟一个简化的 AI 回答流程,将句子逐字符发送:

import json
from fastapi import Request

async def mock_ai_stream(user_msg: str):
    # 模拟AI生成的完整回复
    ai_reply = f"您的问题是「{user_msg}」,让我为您逐步分析:首先,我们要理清需求边界;其次,梳理实现路径;最后,优化细节。"
    
    accumulated = ""
    for char in ai_reply:
        accumulated += char
        chunk = {
            "type": "token",      # 消息类型
            "content": char,      # 当前新增的字符
            "full": accumulated   # 累积完整文本,方便前端直接渲染
        }
        yield f"data: {json.dumps(chunk)}\n\n"
        await asyncio.sleep(0.02)  # 模拟AI生成延迟(一般实际生产可能是几十毫秒)

@app.post("/mock-ai-chat")
async def mock_ai_chat(req: Request):
    body = await req.json()
    user_msg = body.get("msg", "")
    return StreamingResponse(
        mock_ai_stream(user_msg),
        media_type="text/event-stream",
        headers={"X-Accel-Buffering": "no"}
    )

前端拿到每个 chunk 后,可以实时拼接到对话界面上,形成打字效果。通常也会增加一个 "[DONE]" 标记表示流结束,但本例为简化省略,真实场景中请参考 OpenAI 的响应格式。


文件流与日志实时推送

大文件分块下载

流式响应同样适用于文件下载。传统方式可能用 FileResponse 将整个文件读入内存再发送,但当文件达到 GB 级别时就会出问题。
利用 StreamingResponse 配合异步文件读取,我们可以每次读取一小块(如 8KB)发送,内存占用始终控制在块大小以内。

import aiofiles
from pathlib import Path

async def file_chunk_stream(file_path: str, chunk_size: int = 8192):
    p = Path(file_path)
    if not p.exists():
        yield b"File not found"
        return
    
    async with aiofiles.open(p, "rb") as f:
        while chunk := await f.read(chunk_size):
            yield chunk   # 二进制块直接 yield

@app.get("/download/{filename}")
async def download_file(filename: str):
    local_path = f"./data/{filename}"  # 生产环境务必做路径穿越检查!
    return StreamingResponse(
        file_chunk_stream(local_path),
        media_type="application/octet-stream",
        headers={
            "Content-Disposition": f"attachment; filename={filename}"
        }
    )

实时日志推送

类似地,我们可以将应用产生的日志实时推送到前端仪表盘。例如后台有一个日志队列,生成器不断从中取出日志行并发送:

import asyncio
from collections import deque

log_queue = deque()

async def log_stream():
    while True:
        if log_queue:
            line = log_queue.popleft()
            yield f"data: {line}\n\n"
        else:
            await asyncio.sleep(0.1)  # 避免空转消耗CPU

这种方式常用于监控系统、CI/CD流水线日志展示。


生产环境关键配置

Nginx 反向代理

大多数 FastAPI 应用前面都会挂一层 Nginx。如果 Nginx 对 SSE 或流式接口开启了缓冲,你的数据就会被“堵”住,前端要等到缓冲区满了才能看到内容,完全丧失流式特性。因此必须对相关路径禁用缓冲

location /stream/ {
    proxy_pass http://uvicorn_backend;
    proxy_http_version 1.1;

    # 核心:关闭所有缓冲
    proxy_buffering off;
    proxy_cache off;
    proxy_set_header X-Accel-Buffering no;
    
    # 长连接超时设置(根据业务调整)
    proxy_read_timeout 300s;
    proxy_send_timeout 300s;
    keepalive_timeout 300s;
}

注意,如果你的流式接口路径不是 /stream/,记得相应修改 location 块,或者通过响应头 X-Accel-Buffering: no 统一关闭(两者选其一即可)。

Uvicorn / Gunicorn 参数

建议使用异步 worker 来发挥 FastAPI 的异步特性,同时调大超时和并发限制:

# 使用 uvicorn 直接运行(适合开发、小规模生产)
uvicorn main:app --workers 4 \
    --worker-class uvicorn.workers.UvicornWorker \
    --timeout-keep-alive 300 \
    --limit-concurrency 100
  • --workers 4:生产环境一般设置为 CPU 核心数
  • --worker-class uvicorn.workers.UvicornWorker:Gunicorn 管理时指定异步 worker
  • --timeout-keep-alive 300:保持连接最大空闲时间,适配长连接场景
  • --limit-concurrency 100:限制同时处理的请求数,防止资源耗尽

前端极简集成方案

前端接收 SSE 消息最方便的方法就是使用浏览器原生的 EventSource 对象,无需引入任何第三方库。

<!DOCTYPE html>
<html>
<body>
    <div id="output" style="font-family: monospace; white-space: pre-wrap;"></div>
    <script>
        // 连接到 SSE 端点
        const es = new EventSource("/sse-status");
        const output = document.getElementById("output");

        // 1. 接收默认事件(没有 event: 前缀的消息)
        es.onmessage = (e) => {
            const data = JSON.parse(e.data);
            output.innerHTML += `默认事件:进度 ${data.progress}%<br>`;
        };

        // 2. 接收自定义 checkpoint 事件
        es.addEventListener("checkpoint", (e) => {
            const data = JSON.parse(e.data);
            output.innerHTML += `<span style="color:blue;">✅ 检查点 ${data.step} 达成!</span><br>`;
        });

        // 3. 错误处理(连接中断或异常时触发,会自动重连)
        es.onerror = () => console.log("连接出错,尝试自动重连...");

        // 4. 如果需要主动关闭连接
        // setTimeout(() => es.close(), 10000);
    </script>
</body>
</html>

EventSource 内置的重连机制非常实用:网络抖动或服务端重启后,它会自动按一定间隔重新连接。你还可以通过服务端发送 retry: 字段来调整重连间隔。


📝 总结:FastAPI 的 StreamingResponse 是实现流式响应的基石,配合 SSE 标准协议,可以轻松构建出体验丝滑的 AI 对话、实时日志面板和大文件下载服务。
关键在于:异步生成器 + 合适的 MIME 类型 + 禁用各级缓冲。掌握这三个要点,你就能在生产环境中稳定运行流式应用。