FastAPI流式响应StreamingResponse完全指南

📂 所属阶段:第六阶段 — 2026 特色专题(AI 集成篇)
🔗 相关章节:WebSocket实时通信 · 异步编程深度解析

目录

流式响应基础概念

什么是流式响应?

流式响应是一种特殊的HTTP响应模式,允许服务器逐步向客户端发送数据,而不是一次性发送完整响应。这种模式特别适用于:

传统响应模式:
┌─────────────────────────────────────────────────────┐
│  客户端 → 请求 → 服务器 → 处理 → 完整响应 → 客户端 │
│  (等待所有数据处理完成)                              │
└─────────────────────────────────────────────────────┘

流式响应模式:
┌─────────────────────────────────────────────────────┐
│  客户端 → 请求 → 服务器 → 边处理边发送 → 客户端    │
│  (实时接收数据片段)                                  │
└─────────────────────────────────────────────────────┘

流式响应的核心优势

优势说明应用场景
实时性数据处理完成后立即发送AI对话、实时日志
内存效率避免在服务器端累积大量数据大文件传输
用户体验即时反馈,减少等待焦虑打字机效果、进度显示
资源节约按需传输,减少网络压力视频流、音频流

流式响应技术对比

技术适用场景复杂度优势
StreamingResponse简单流式数据易实现、轻量级
SSE服务器主动推送标准化、浏览器原生支持
WebSocket双向通信全双工、低延迟
Server-Sent Events单向推送持久连接、自动重连

典型应用场景

  1. AI对话系统:如ChatGPT、Claude等,实现打字机效果
  2. 文件下载:大文件分块传输,边生成边下载
  3. 实时日志:服务器日志实时推送
  4. 视频流:逐帧传输视频数据
  5. 进度指示:长时间任务的实时进度更新
  6. 股票价格:实时金融数据推送
  7. 聊天应用:实时消息推送

StreamingResponse核心实现

基础流式响应

让我们从最简单的流式响应开始:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
from typing import AsyncGenerator

app = FastAPI()

async def number_generator():
    """数字生成器 - 基础流式响应示例"""
    for i in range(1, 11):
        yield f"数据 {i}\n"
        await asyncio.sleep(0.5)  # 模拟处理时间

@app.get("/stream-numbers")
async def stream_numbers():
    """流式返回数字序列"""
    return StreamingResponse(
        number_generator(),
        media_type="text/plain",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive"
        }
    )

高级流式响应配置

async def advanced_stream_generator():
    """高级流式响应生成器"""
    try:
        for i in range(100):
            # 模拟不同类型的数据
            data = {
                "id": i,
                "timestamp": asyncio.get_event_loop().time(),
                "data": f"Processing item {i}",
                "status": "processing"
            }
            
            yield f"data: {json.dumps(data)}\n\n"
            await asyncio.sleep(0.1)
            
            # 模拟可能的错误
            if i == 50:
                error_data = {
                    "id": i,
                    "error": "Processing error occurred",
                    "status": "error"
                }
                yield f"data: {json.dumps(error_data)}\n\n"
                
    except asyncio.CancelledError:
        # 客户端断开连接时的清理工作
        print("Client disconnected")
        raise
    except Exception as e:
        # 处理异常情况
        error_response = {
            "error": str(e),
            "status": "error"
        }
        yield f"data: {json.dumps(error_response)}\n\n"
    finally:
        # 完成信号
        complete_data = {
            "status": "completed",
            "message": "Stream completed successfully"
        }
        yield f"data: {json.dumps(complete_data)}\n\n"

@app.get("/advanced-stream")
async def advanced_stream():
    """高级流式响应端点"""
    return StreamingResponse(
        advanced_stream_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # 禁用Nginx缓冲
            "Access-Control-Allow-Origin": "*",
            "Access-Control-Allow-Headers": "Cache-Control"
        }
    )

自定义流式响应类

from fastapi.responses import StreamingResponse
from typing import AsyncIterator, Callable, Any

class CustomStreamingResponse(StreamingResponse):
    """自定义流式响应类"""
    
    def __init__(
        self,
        content: Any,
        status_code: int = 200,
        headers: dict = None,
        media_type: str = None,
        background=None,
        custom_headers: dict = None
    ):
        super().__init__(content, status_code, headers, media_type, background)
        
        # 添加自定义头部
        if custom_headers:
            self.headers.update(custom_headers)
        
        # 默认流式响应头部
        self.headers.setdefault("Cache-Control", "no-cache")
        self.headers.setdefault("Connection", "keep-alive")
        self.headers.setdefault("X-Accel-Buffering", "no")

async def custom_stream_generator():
    """自定义流生成器"""
    for i in range(10):
        yield f"Custom data {i}\n"
        await asyncio.sleep(0.2)

@app.get("/custom-stream")
async def custom_stream():
    """使用自定义流式响应"""
    return CustomStreamingResponse(
        custom_stream_generator(),
        media_type="text/plain",
        custom_headers={
            "X-Stream-Id": "custom-stream-123",
            "X-Data-Type": "numbers"
        }
    )

SSE服务器发送事件

SSE基础实现

from datetime import datetime
import time

async def sse_generator():
    """SSE格式数据生成器"""
    for i in range(10):
        data = {
            "id": i,
            "timestamp": datetime.now().isoformat(),
            "message": f"SSE message {i}",
            "server_time": time.time()
        }
        
        # SSE格式:event + data
        yield f"event: message\ndata: {json.dumps(data)}\n\n"
        await asyncio.sleep(1)

@app.get("/sse-basic")
async def sse_basic():
    """基础SSE端点"""
    return StreamingResponse(
        sse_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

高级SSE实现

class SSEEvent:
    """SSE事件类"""
    
    def __init__(self, data=None, event_type=None, event_id=None, retry=None):
        self.data = data
        self.event_type = event_type
        self.event_id = event_id
        self.retry = retry
    
    def to_sse_format(self):
        """转换为SSE格式"""
        lines = []
        
        if self.event_id is not None:
            lines.append(f"id: {self.event_id}")
        
        if self.event_type is not None:
            lines.append(f"event: {self.event_type}")
        
        if self.data is not None:
            # 如果数据是字典,转换为JSON字符串
            if isinstance(self.data, dict):
                data_str = json.dumps(self.data, ensure_ascii=False)
            else:
                data_str = str(self.data)
            
            # SSE要求数据行以冒号开头
            for line in data_str.split('\n'):
                lines.append(f"data: {line}")
        
        if self.retry is not None:
            lines.append(f"retry: {self.retry}")
        
        # 以两个换行符结尾
        return '\n'.join(lines) + '\n\n'

async def advanced_sse_generator():
    """高级SSE生成器"""
    try:
        # 发送重连间隔
        yield "retry: 10000\n\n"
        
        for i in range(20):
            # 不同类型的事件
            if i % 5 == 0:
                # 心跳事件
                heartbeat = SSEEvent(
                    data={"type": "heartbeat", "timestamp": datetime.now().isoformat()},
                    event_type="heartbeat",
                    event_id=str(i)
                )
            elif i % 3 == 0:
                # 状态更新事件
                status_update = SSEEvent(
                    data={
                        "type": "status",
                        "message": f"Processing batch {i//3 + 1}",
                        "progress": i * 5,
                        "total": 100
                    },
                    event_type="status",
                    event_id=str(i)
                )
            else:
                # 普通数据事件
                data_event = SSEEvent(
                    data={
                        "type": "data",
                        "id": i,
                        "content": f"Real-time data point {i}",
                        "timestamp": datetime.now().isoformat()
                    },
                    event_type="data",
                    event_id=str(i)
                )
            
            yield data_event.to_sse_format()
            await asyncio.sleep(0.5)
            
    except asyncio.CancelledError:
        print("SSE connection cancelled by client")
        raise
    except Exception as e:
        error_event = SSEEvent(
            data={"type": "error", "message": str(e)},
            event_type="error"
        )
        yield error_event.to_sse_format()

@app.get("/sse-advanced")
async def sse_advanced():
    """高级SSE端点"""
    return StreamingResponse(
        advanced_sse_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
            "Access-Control-Allow-Origin": "*"
        }
    )

SSE连接管理

from collections import defaultdict
import uuid

class SSEConnectionManager:
    """SSE连接管理器"""
    
    def __init__(self):
        self.connections = defaultdict(set)
        self.active_streams = {}
    
    async def connect(self, stream_id: str, send_func: Callable):
        """连接到SSE流"""
        connection_id = str(uuid.uuid4())
        self.connections[stream_id].add(connection_id)
        self.active_streams[connection_id] = send_func
        return connection_id
    
    async def disconnect(self, connection_id: str, stream_id: str):
        """断开SSE连接"""
        self.connections[stream_id].discard(connection_id)
        if connection_id in self.active_streams:
            del self.active_streams[connection_id]
    
    def broadcast_to_stream(self, stream_id: str, event: SSEEvent):
        """向特定流广播事件"""
        for connection_id in list(self.connections[stream_id]):
            if connection_id in self.active_streams:
                # 这里需要实际的发送逻辑
                pass
    
    def get_active_connections(self, stream_id: str):
        """获取活跃连接数"""
        return len(self.connections[stream_id])

# 全局连接管理器实例
sse_manager = SSEConnectionManager()

async def managed_sse_generator(stream_id: str):
    """受管理的SSE生成器"""
    connection_id = None
    try:
        # 模拟连接管理
        connection_id = str(uuid.uuid4())
        
        # 发送连接确认
        yield f"data: {json.dumps({'type': 'connected', 'connection_id': connection_id, 'stream_id': stream_id})}\n\n"
        
        for i in range(30):
            data = {
                "type": "stream_data",
                "stream_id": stream_id,
                "sequence": i,
                "timestamp": datetime.now().isoformat(),
                "active_connections": sse_manager.get_active_connections(stream_id)
            }
            
            yield f"data: {json.dumps(data)}\n\n"
            await asyncio.sleep(1)
            
    except asyncio.CancelledError:
        print(f"SSE connection cancelled for stream {stream_id}, connection {connection_id}")
        if connection_id and stream_id:
            await sse_manager.disconnect(connection_id, stream_id)
        raise
    finally:
        if connection_id and stream_id:
            await sse_manager.disconnect(connection_id, stream_id)

@app.get("/sse-managed/{stream_id}")
async def sse_managed(stream_id: str):
    """受管理的SSE端点"""
    return StreamingResponse(
        managed_sse_generator(stream_id),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

AI对话流式输出

基础AI流式响应

async def ai_stream_generator(user_input: str):
    """AI流式响应生成器"""
    # 模拟AI响应生成过程
    ai_response = f"这是对您问题 '{user_input}' 的详细回答。让我为您逐步分析这个问题的各个方面。首先,我们需要考虑相关的因素,然后进行深入的探讨。经过综合分析,我认为最合适的解决方案是..."
    
    # 模拟打字机效果
    accumulated_text = ""
    for i, char in enumerate(ai_response):
        accumulated_text += char
        
        chunk_data = {
            "type": "token",
            "content": char,
            "accumulated": accumulated_text,
            "index": i,
            "is_final": i == len(ai_response) - 1
        }
        
        yield f"data: {json.dumps(chunk_data)}\n\n"
        await asyncio.sleep(0.02)  # 模拟AI生成速度

@app.post("/ai-stream")
async def ai_stream(request: Request):
    """AI流式对话端点"""
    body = await request.json()
    user_message = body.get("message", "")
    
    return StreamingResponse(
        ai_stream_generator(user_message),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

集成真实AI服务

import openai
from typing import AsyncGenerator

async def openai_stream_generator(user_message: str, model: str = "gpt-4o-mini"):
    """OpenAI流式响应生成器"""
    try:
        # 使用httpx进行异步HTTP请求
        async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client:
            response = await client.post(
                "https://api.openai.com/v1/chat/completions",
                headers={
                    "Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
                    "Content-Type": "application/json",
                },
                json={
                    "model": model,
                    "messages": [{"role": "user", "content": user_message}],
                    "stream": True,
                    "temperature": 0.7,
                },
                follow_redirects=True
            )
            
            async for chunk in response.aiter_lines():
                if chunk.startswith("data: "):
                    data = chunk[6:]  # 移除 "data: " 前缀
                    if data.strip() == "[DONE]":
                        break
                    
                    try:
                        parsed = json.loads(data)
                        if "choices" in parsed and len(parsed["choices"]) > 0:
                            delta = parsed["choices"][0].get("delta", {})
                            content = delta.get("content", "")
                            
                            if content:
                                yield f"data: {json.dumps({'type': 'token', 'content': content})}\n\n"
                    except json.JSONDecodeError:
                        continue
                        
    except Exception as e:
        error_data = {
            "type": "error",
            "message": f"AI service error: {str(e)}"
        }
        yield f"data: {json.dumps(error_data)}\n\n"

@app.post("/openai-stream")
async def openai_stream_endpoint(request: Request):
    """OpenAI流式对话端点"""
    body = await request.json()
    user_message = body.get("message", "")
    
    return StreamingResponse(
        openai_stream_generator(user_message),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

AI流式响应优化

class AIStreamProcessor:
    """AI流式响应处理器"""
    
    def __init__(self, max_tokens: int = 2000, timeout: int = 60):
        self.max_tokens = max_tokens
        self.timeout = timeout
        self.token_count = 0
    
    async def process_stream(self, generator_func) -> AsyncGenerator[str, None]:
        """处理AI流式响应"""
        start_time = asyncio.get_event_loop().time()
        
        try:
            async for chunk in generator_func:
                # 检查超时
                if asyncio.get_event_loop().time() - start_time > self.timeout:
                    yield f"data: {json.dumps({'type': 'error', 'message': 'Request timeout'})}\n\n"
                    break
                
                # 检查token限制
                if self.token_count >= self.max_tokens:
                    yield f"data: {json.dumps({'type': 'warning', 'message': 'Max tokens reached'})}\n\n"
                    break
                
                # 处理chunk
                yield chunk
                
        except asyncio.CancelledError:
            yield f"data: {json.dumps({'type': 'info', 'message': 'Request cancelled by client'})}\n\n"
            raise
        except Exception as e:
            yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
        finally:
            # 发送完成信号
            yield f"data: {json.dumps({'type': 'done', 'message': 'Stream completed'})}\n\n"

async def enhanced_ai_stream(user_input: str):
    """增强版AI流式响应"""
    processor = AIStreamProcessor(max_tokens=1000, timeout=30)
    
    async def mock_ai_generator():
        """模拟AI生成器"""
        response_parts = [
            "这是一个模拟的AI响应。", 
            "它展示了流式输出的效果。",
            "每个部分都是逐步生成的。",
            "用户可以实时看到内容的产生。"
        ]
        
        accumulated = ""
        for i, part in enumerate(response_parts):
            for char in part:
                accumulated += char
                chunk_data = {
                    "type": "token",
                    "content": char,
                    "accumulated": accumulated,
                    "part_index": i,
                    "char_index": len(accumulated)
                }
                yield f"data: {json.dumps(chunk_data)}\n\n"
                await asyncio.sleep(0.01)
    
    async for chunk in processor.process_stream(mock_ai_generator()):
        yield chunk

@app.post("/enhanced-ai-stream")
async def enhanced_ai_stream_endpoint(request: Request):
    """增强版AI流式端点"""
    body = await request.json()
    user_message = body.get("message", "")
    
    return StreamingResponse(
        enhanced_ai_stream(user_message),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

文件流传输

大文件流式下载

import aiofiles
from pathlib import Path

async def file_stream_generator(file_path: str, chunk_size: int = 8192):
    """文件流式传输生成器"""
    file_path = Path(file_path)
    
    if not file_path.exists():
        yield f"data: {json.dumps({'error': 'File not found'})}\n\n"
        return
    
    file_size = file_path.stat().st_size
    chunk_count = 0
    
    try:
        async with aiofiles.open(file_path, 'rb') as file:
            bytes_sent = 0
            
            while True:
                chunk = await file.read(chunk_size)
                if not chunk:
                    break
                
                # 发送文件块
                yield chunk
                bytes_sent += len(chunk)
                chunk_count += 1
                
                # 发送进度更新(每10个块)
                if chunk_count % 10 == 0:
                    progress_data = {
                        "type": "progress",
                        "bytes_sent": bytes_sent,
                        "total_size": file_size,
                        "percentage": round((bytes_sent / file_size) * 100, 2)
                    }
                    yield f"\ndata: {json.dumps(progress_data)}\n\n"
                
                # 小幅延迟以避免过度占用资源
                await asyncio.sleep(0.001)
    
    except asyncio.CancelledError:
        print("File download cancelled by client")
        raise
    except Exception as e:
        error_data = {
            "type": "error",
            "message": f"File read error: {str(e)}"
        }
        yield f"data: {json.dumps(error_data)}\n\n"

@app.get("/download-file/{filename}")
async def download_file(filename: str):
    """文件流式下载端点"""
    file_path = f"/path/to/files/{filename}"  # 实际路径需要根据实际情况调整
    
    return StreamingResponse(
        file_stream_generator(file_path),
        media_type="application/octet-stream",
        headers={
            "Content-Disposition": f"attachment; filename={filename}",
            "Content-Length": str(Path(file_path).stat().st_size) if Path(file_path).exists() else "0",
            "Cache-Control": "no-cache",
            "Connection": "keep-alive"
        }
    )

动态文件生成流

import csv
import io
from contextlib import asynccontextmanager

async def dynamic_csv_generator(data_source_func):
    """动态CSV文件生成器"""
    # 创建内存中的CSV写入器
    output = io.StringIO()
    writer = None
    
    try:
        async for row_data in data_source_func():
            if writer is None:
                # 初始化CSV写入器(假设第一行是表头)
                fieldnames = list(row_data.keys()) if isinstance(row_data, dict) else [f"column_{i}" for i in range(len(row_data))]
                writer = csv.DictWriter(output, fieldnames=fieldnames)
                writer.writeheader()
                yield output.getvalue().encode('utf-8')
                output.seek(0)
                output.truncate(0)
            
            # 写入数据行
            if isinstance(row_data, dict):
                writer.writerow(row_data)
            else:
                # 如果不是字典,转换为字典格式
                row_dict = {f"column_{i}": val for i, val in enumerate(row_data)}
                writer.writerow(row_dict)
            
            chunk = output.getvalue()
            if chunk:
                yield chunk.encode('utf-8')
                output.seek(0)
                output.truncate(0)
            
            # 小幅延迟以避免过度占用资源
            await asyncio.sleep(0.001)
    
    except Exception as e:
        error_row = {"error": str(e)}
        yield f"Error occurred: {str(e)}".encode('utf-8')

async def sample_data_source():
    """示例数据源"""
    for i in range(100):
        yield {
            "id": i,
            "name": f"Item {i}",
            "value": i * 10,
            "timestamp": datetime.now().isoformat()
        }
        await asyncio.sleep(0.01)  # 模拟数据生成延迟

@app.get("/download-dynamic-csv")
async def download_dynamic_csv():
    """动态CSV文件下载"""
    return StreamingResponse(
        dynamic_csv_generator(sample_data_source),
        media_type="text/csv",
        headers={
            "Content-Disposition": "attachment; filename=dynamic_data.csv",
            "Cache-Control": "no-cache"
        }
    )

实时日志推送

实时日志流

import logging
from logging.handlers import RotatingFileHandler
import queue
import threading
from concurrent.futures import ThreadPoolExecutor

class RealTimeLogStreamer:
    """实时日志流处理器"""
    
    def __init__(self):
        self.log_queue = asyncio.Queue()
        self.clients = set()
        self.logger = self._setup_logger()
    
    def _setup_logger(self):
        """设置日志记录器"""
        logger = logging.getLogger("realtime_logs")
        logger.setLevel(logging.INFO)
        
        # 创建内存处理器
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        ))
        
        logger.addHandler(handler)
        logger.propagate = False  # 防止重复记录
        
        return logger
    
    async def log_generator(self):
        """日志流生成器"""
        try:
            while True:
                try:
                    # 从队列获取日志记录
                    log_record = await asyncio.wait_for(self.log_queue.get(), timeout=1.0)
                    
                    log_data = {
                        "timestamp": datetime.now().isoformat(),
                        "level": log_record.levelname,
                        "message": log_record.getMessage(),
                        "module": log_record.module,
                        "function": log_record.funcName,
                        "line": log_record.lineno
                    }
                    
                    yield f"data: {json.dumps(log_data)}\n\n"
                    
                except asyncio.TimeoutError:
                    # 发送心跳以维持连接
                    heartbeat = {
                        "type": "heartbeat",
                        "timestamp": datetime.now().isoformat()
                    }
                    yield f"data: {json.dumps(heartbeat)}\n\n"
                    continue
        except asyncio.CancelledError:
            print("Log streaming cancelled")
            raise
        except Exception as e:
            error_data = {
                "type": "error",
                "message": f"Log streaming error: {str(e)}"
            }
            yield f"data: {json.dumps(error_data)}\n\n"

    def add_log(self, level: str, message: str):
        """添加日志到队列"""
        record = logging.LogRecord(
            name="realtime_logs",
            level=getattr(logging, level.upper()),
            pathname="",
            lineno=0,
            msg=message,
            args=(),
            exc_info=None
        )
        # 将记录放入队列(这里需要在异步上下文中处理)
        pass

# 全局日志流实例
log_streamer = RealTimeLogStreamer()

@app.get("/realtime-logs")
async def realtime_logs():
    """实时日志流端点"""
    return StreamingResponse(
        log_streamer.log_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

@app.post("/log-message")
async def log_message(request: Request):
    """记录日志消息"""
    body = await request.json()
    level = body.get("level", "INFO")
    message = body.get("message", "")
    
    # 这里应该将日志添加到队列中
    # log_streamer.add_log(level, message)
    
    return {"status": "logged", "message": message}

系统监控流

import psutil
import time

async def system_monitor_stream():
    """系统监控流生成器"""
    start_time = time.time()
    
    try:
        while True:
            # 收集系统指标
            metrics = {
                "timestamp": datetime.now().isoformat(),
                "uptime": time.time() - start_time,
                "cpu_percent": psutil.cpu_percent(interval=1),
                "memory_percent": psutil.virtual_memory().percent,
                "disk_percent": psutil.disk_usage('/').percent,
                "network_io": psutil.net_io_counters()._asdict(),
                "process_count": len(psutil.pids()),
                "load_average": psutil.getloadavg()
            }
            
            yield f"data: {json.dumps(metrics)}\n\n"
            await asyncio.sleep(2)  # 每2秒更新一次
            
    except asyncio.CancelledError:
        print("System monitoring cancelled")
        raise
    except Exception as e:
        error_data = {
            "type": "error",
            "message": f"Monitoring error: {str(e)}"
        }
        yield f"data: {json.dumps(error_data)}\n\n"

@app.get("/system-monitor")
async def system_monitor():
    """系统监控流端点"""
    return StreamingResponse(
        system_monitor_stream(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

性能优化策略

流式响应缓冲优化

class OptimizedStreamBuffer:
    """优化的流式响应缓冲区"""
    
    def __init__(self, max_buffer_size: int = 65536, flush_interval: float = 0.1):
        self.buffer = []
        self.max_buffer_size = max_buffer_size
        self.flush_interval = flush_interval
        self.last_flush = asyncio.get_event_loop().time()
    
    def add_chunk(self, chunk: str) -> list:
        """添加数据块到缓冲区,返回需要发送的块列表"""
        self.buffer.append(chunk)
        current_time = asyncio.get_event_loop().time()
        
        # 检查是否需要刷新
        should_flush = (
            len(''.join(self.buffer)) >= self.max_buffer_size or
            current_time - self.last_flush >= self.flush_interval
        )
        
        if should_flush:
            chunks_to_send = self.buffer.copy()
            self.buffer.clear()
            self.last_flush = current_time
            return chunks_to_send
        
        return []
    
    def force_flush(self) -> list:
        """强制刷新缓冲区"""
        if self.buffer:
            chunks_to_send = self.buffer.copy()
            self.buffer.clear()
            return chunks_to_send
        return []

async def optimized_stream_generator():
    """使用优化缓冲区的流式生成器"""
    buffer = OptimizedStreamBuffer(max_buffer_size=8192, flush_interval=0.05)
    
    try:
        for i in range(100):
            chunk = f"data: {json.dumps({'id': i, 'message': f'Message {i}'})}\n\n"
            
            # 添加到缓冲区
            chunks_to_flush = buffer.add_chunk(chunk)
            
            for flush_chunk in chunks_to_flush:
                yield flush_chunk
            
            await asyncio.sleep(0.01)
        
        # 最后强制刷新
        final_chunks = buffer.force_flush()
        for chunk in final_chunks:
            yield chunk
            
    except asyncio.CancelledError:
        # 发送剩余缓冲区内容
        remaining_chunks = buffer.force_flush()
        for chunk in remaining_chunks:
            yield chunk
        raise

@app.get("/optimized-stream")
async def optimized_stream():
    """优化的流式响应端点"""
    return StreamingResponse(
        optimized_stream_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

并发流式响应管理

import weakref
from typing import Dict, Set

class StreamConcurrencyManager:
    """流式响应并发管理器"""
    
    def __init__(self, max_concurrent_streams: int = 100):
        self.max_concurrent_streams = max_concurrent_streams
        self.active_streams: Set[str] = weakref.WeakSet()
        self.stream_counter = 0
    
    def register_stream(self, stream_id: str) -> bool:
        """注册新的流式响应"""
        if len(self.active_streams) >= self.max_concurrent_streams:
            return False
        
        self.active_streams.add(stream_id)
        self.stream_counter += 1
        return True
    
    def unregister_stream(self, stream_id: str):
        """注销流式响应"""
        self.active_streams.discard(stream_id)
    
    def get_stats(self):
        """获取统计信息"""
        return {
            "active_streams": len(self.active_streams),
            "total_streams_served": self.stream_counter,
            "max_allowed": self.max_concurrent_streams
        }

# 全局并发管理器
concurrency_manager = StreamConcurrencyManager(max_concurrent_streams=50)

async def managed_stream_generator(client_id: str):
    """受管理的流式生成器"""
    stream_id = f"stream_{client_id}_{uuid.uuid4()}"
    
    if not concurrency_manager.register_stream(stream_id):
        yield f"data: {json.dumps({'error': 'Too many concurrent streams'})}\n\n"
        return
    
    try:
        for i in range(50):
            data = {
                "client_id": client_id,
                "sequence": i,
                "timestamp": datetime.now().isoformat(),
                "concurrency_stats": concurrency_manager.get_stats()
            }
            yield f"data: {json.dumps(data)}\n\n"
            await asyncio.sleep(0.1)
    finally:
        concurrency_manager.unregister_stream(stream_id)

@app.get("/managed-stream/{client_id}")
async def managed_stream(client_id: str):
    """受管理的流式响应端点"""
    return StreamingResponse(
        managed_stream_generator(client_id),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

错误处理与异常

全面的错误处理

from enum import Enum

class StreamErrorType(Enum):
    """流式响应错误类型"""
    CONNECTION_TIMEOUT = "connection_timeout"
    SERVER_ERROR = "server_error"
    CLIENT_DISCONNECTED = "client_disconnected"
    RESOURCE_EXHAUSTED = "resource_exhausted"
    VALIDATION_ERROR = "validation_error"

class StreamErrorHandler:
    """流式响应错误处理器"""
    
    @staticmethod
    async def handle_stream_exception(exc: Exception, context: str = "") -> str:
        """处理流式响应异常"""
        error_info = {
            "type": "error",
            "error_type": type(exc).__name__,
            "message": str(exc),
            "context": context,
            "timestamp": datetime.now().isoformat()
        }
        
        if isinstance(exc, asyncio.TimeoutError):
            error_info["error_type_enum"] = StreamErrorType.CONNECTION_TIMEOUT.value
        elif isinstance(exc, asyncio.CancelledError):
            error_info["error_type_enum"] = StreamErrorType.CLIENT_DISCONNECTED.value
        else:
            error_info["error_type_enum"] = StreamErrorType.SERVER_ERROR.value
        
        return f"data: {json.dumps(error_info)}\n\n"
    
    @staticmethod
    def create_error_event(error_type: StreamErrorType, message: str, **kwargs) -> str:
        """创建错误事件"""
        error_data = {
            "type": "error",
            "error_type": error_type.value,
            "message": message,
            "timestamp": datetime.now().isoformat(),
            **kwargs
        }
        return f"data: {json.dumps(error_data)}\n\n"

async def robust_stream_generator():
    """健壮的流式生成器"""
    error_handler = StreamErrorHandler()
    
    try:
        for i in range(20):
            try:
                # 模拟可能出错的操作
                if i == 10:
                    # 模拟一个错误
                    raise ValueError(f"Simulated error at iteration {i}")
                
                data = {
                    "id": i,
                    "status": "success",
                    "message": f"Processing item {i}"
                }
                yield f"data: {json.dumps(data)}\n\n"
                await asyncio.sleep(0.2)
                
            except ValueError as ve:
                error_event = error_handler.create_error_event(
                    StreamErrorType.VALIDATION_ERROR,
                    str(ve),
                    iteration=i
                )
                yield error_event
                # 继续处理后续项目
                continue
            except Exception as e:
                error_event = await error_handler.handle_stream_exception(
                    e, 
                    f"Error processing iteration {i}"
                )
                yield error_event
                break
    
    except asyncio.CancelledError:
        # 客户端断开连接
        cancel_event = error_handler.create_error_event(
            StreamErrorType.CLIENT_DISCONNECTED,
            "Client disconnected from stream"
        )
        yield cancel_event
        raise
    except Exception as e:
        # 严重错误
        fatal_event = await error_handler.handle_stream_exception(
            e,
            "Fatal error in stream generator"
        )
        yield fatal_event

@app.get("/robust-stream")
async def robust_stream():
    """健壮的流式响应端点"""
    return StreamingResponse(
        robust_stream_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

生产环境部署

Nginx配置优化

# nginx-sse.conf - SSE优化配置
server {
    listen 80;
    server_name your-domain.com;
    
    # SSE特定配置
    location /stream {
        proxy_pass http://gunicorn_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection 'upgrade';
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_cache_bypass $http_upgrade;
        
        # 关键:禁用Nginx缓冲以支持SSE
        proxy_buffering off;
        proxy_cache off;
        proxy_set_header X-Accel-Buffering no;
        
        # 超时配置
        proxy_connect_timeout 60s;
        proxy_send_timeout 300s;
        proxy_read_timeout 300s;
        
        # 长连接
        keepalive_timeout 300s;
    }
    
    # 一般API配置
    location / {
        proxy_pass http://gunicorn_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # 保持默认缓冲用于普通请求
        proxy_buffering on;
    }
}

Gunicorn配置

# gunicorn_streaming_config.py - 流式响应Gunicorn配置
import multiprocessing

# 基础配置
bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000

# 流式响应特定配置
timeout = 300  # 为流式响应增加超时时间
graceful_timeout = 30
keepalive = 5

# 性能优化
preload_app = True
max_requests = 1000
max_requests_jitter = 100

# 日志配置
accesslog = "/var/log/gunicorn/stream_access.log"
errorlog = "/var/log/gunicorn/stream_error.log"
loglevel = "info"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'

# 内存优化
worker_tmp_dir = "/dev/shm"  # 使用内存临时目录

前端集成方案

JavaScript SSE客户端

// sse-client.js - SSE客户端实现
class SSEClient {
    constructor(url, options = {}) {
        this.url = url;
        this.options = {
            withCredentials: false,
            headers: {},
            onOpen: null,
            onMessage: null,
            onError: null,
            onClose: null,
            reconnectInterval: 3000,
            maxReconnectAttempts: 10,
            ...options
        };
        
        this.eventSource = null;
        this.reconnectAttempts = 0;
        this.isConnected = false;
    }
    
    connect() {
        try {
            this.eventSource = new EventSource(this.url, {
                withCredentials: this.options.withCredentials
            });
            
            // 添加自定义头部(注意:EventSource不直接支持自定义头部)
            Object.entries(this.options.headers).forEach(([key, value]) => {
                // 对于需要认证的场景,通常将token放在URL参数中
            });
            
            this.eventSource.onopen = (event) => {
                this.isConnected = true;
                this.reconnectAttempts = 0;
                console.log('SSE connection opened');
                if (this.options.onOpen) {
                    this.options.onOpen(event);
                }
            };
            
            this.eventSource.onmessage = (event) => {
                try {
                    const data = JSON.parse(event.data);
                    if (this.options.onMessage) {
                        this.options.onMessage(data, event);
                    }
                } catch (e) {
                    console.error('Error parsing SSE message:', e);
                    if (this.options.onMessage) {
                        this.options.onMessage(event.data, event);
                    }
                }
            };
            
            this.eventSource.onerror = (event) => {
                console.error('SSE error:', event);
                this.isConnected = false;
                
                if (this.options.onError) {
                    this.options.onError(event);
                }
                
                // 自动重连
                if (this.reconnectAttempts < this.options.maxReconnectAttempts) {
                    setTimeout(() => {
                        this.reconnectAttempts++;
                        this.connect();
                    }, this.options.reconnectInterval);
                } else {
                    if (this.options.onClose) {
                        this.options.onClose();
                    }
                }
            };
            
        } catch (error) {
            console.error('Failed to create SSE connection:', error);
            if (this.options.onError) {
                this.options.onError(error);
            }
        }
    }
    
    disconnect() {
        if (this.eventSource) {
            this.eventSource.close();
            this.isConnected = false;
        }
    }
    
    sendMessage(message) {
        // SSE是单向的,客户端不能直接发送消息
        // 需要使用其他方式(如普通HTTP请求)来发送数据
        fetch(this.url, {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
            },
            body: JSON.stringify(message)
        });
    }
}

// 使用示例
const sseClient = new SSEClient('/api/realtime-updates', {
    onOpen: (event) => {
        console.log('Connected to real-time updates');
    },
    onMessage: (data, event) => {
        console.log('Received update:', data);
        // 更新UI
        updateUI(data);
    },
    onError: (error) => {
        console.error('SSE Error:', error);
    }
});

function updateUI(data) {
    const output = document.getElementById('output');
    const div = document.createElement('div');
    div.textContent = `[${new Date().toLocaleTimeString()}] ${JSON.stringify(data)}`;
    output.appendChild(div);
}

// 启动连接
sseClient.connect();

AI聊天界面集成

// ai-chat-integration.js - AI聊天界面集成
class AIChatInterface {
    constructor(endpoint, outputElement) {
        this.endpoint = endpoint;
        this.outputElement = outputElement;
        this.isStreaming = false;
        this.currentMessage = '';
    }
    
    async sendMessage(message) {
        if (this.isStreaming) {
            throw new Error('Already streaming a response');
        }
        
        // 显示用户消息
        this.displayMessage(message, 'user');
        
        // 显示加载状态
        const loadingElement = this.createLoadingElement();
        this.outputElement.appendChild(loadingElement);
        
        this.isStreaming = true;
        
        try {
            const response = await fetch(this.endpoint, {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                },
                body: JSON.stringify({ message })
            });
            
            if (!response.ok) {
                throw new Error(`HTTP error! status: ${response.status}`);
            }
            
            const reader = response.body.getReader();
            const decoder = new TextDecoder();
            
            // 移除加载状态
            if (loadingElement.parentNode) {
                loadingElement.remove();
            }
            
            // 创建AI消息容器
            const aiMessageElement = this.createAIMessageElement();
            this.outputElement.appendChild(aiMessageElement);
            
            while (true) {
                const { done, value } = await reader.read();
                
                if (done) {
                    break;
                }
                
                const chunk = decoder.decode(value, { stream: true });
                
                // 解析SSE格式的数据
                const lines = chunk.split('\n');
                for (const line of lines) {
                    if (line.startsWith('data: ')) {
                        const dataStr = line.substring(6); // 移除 "data: " 前缀
                        if (dataStr.trim()) {
                            try {
                                const data = JSON.parse(dataStr);
                                
                                if (data.type === 'token' && data.content) {
                                    this.currentMessage += data.content;
                                    aiMessageElement.innerHTML = this.escapeHtml(this.currentMessage);
                                    
                                    // 滚动到底部
                                    this.outputElement.scrollTop = this.outputElement.scrollHeight;
                                } else if (data.type === 'error') {
                                    aiMessageElement.innerHTML = `<span class="error">${this.escapeHtml(data.message)}</span>`;
                                    break;
                                } else if (data.type === 'done') {
                                    // 完成处理
                                    break;
                                }
                            } catch (e) {
                                console.error('Error parsing SSE data:', e);
                            }
                        }
                    }
                }
            }
        } catch (error) {
            console.error('Streaming error:', error);
            const errorElement = document.createElement('div');
            errorElement.className = 'message error';
            errorElement.textContent = `Error: ${error.message}`;
            this.outputElement.appendChild(errorElement);
        } finally {
            this.isStreaming = false;
        }
    }
    
    displayMessage(content, sender) {
        const messageElement = document.createElement('div');
        messageElement.className = `message ${sender}`;
        messageElement.textContent = content;
        this.outputElement.appendChild(messageElement);
        
        this.outputElement.scrollTop = this.outputElement.scrollHeight;
    }
    
    createLoadingElement() {
        const element = document.createElement('div');
        element.className = 'message ai loading';
        element.innerHTML = '<div class="loading-dots">●●●</div>';
        return element;
    }
    
    createAIMessageElement() {
        const element = document.createElement('div');
        element.className = 'message ai';
        element.style.whiteSpace = 'pre-wrap';
        return element;
    }
    
    escapeHtml(text) {
        const div = document.createElement('div');
        div.textContent = text;
        return div.innerHTML;
    }
}

// 使用示例
const chatInterface = new AIChatInterface('/ai-stream', document.getElementById('chat-output'));

document.getElementById('send-button').addEventListener('click', async () => {
    const input = document.getElementById('message-input');
    const message = input.value.trim();
    
    if (message) {
        input.value = '';
        await chatInterface.sendMessage(message);
    }
});

// 回车发送
document.getElementById('message-input').addEventListener('keypress', (e) => {
    if (e.key === 'Enter' && !e.shiftKey) {
        e.preventDefault();
        document.getElementById('send-button').click();
    }
});

常见问题解答

Q1: 如何处理客户端断开连接?

A: 在流式响应生成器中捕获asyncio.CancelledError异常,这表示客户端已断开连接。在except块中进行必要的清理工作,如关闭数据库连接、释放资源等。

Q2: SSE连接如何实现自动重连?

A: 客户端可以通过监听onerror事件来检测连接断开,并在一定延迟后重新建立连接。服务端可以在SSE流中发送retry: <milliseconds>指令来建议重连间隔。

Q3: 如何优化流式响应的性能?

A: 可以通过以下方式优化:1)使用缓冲区批量发送数据;2)合理设置超时时间;3)限制并发连接数;4)优化数据序列化过程;5)使用连接池管理数据库连接。

Q4: 流式响应如何处理错误?

A: 在生成器中使用try-catch块捕获异常,将错误信息格式化为SSE事件发送给客户端,然后可以选择继续流式传输或终止连接。

Q5: 如何在生产环境中部署流式响应?

A: 需要特别注意反向代理(如Nginx)的配置,禁用缓冲以支持实时数据传输;调整超时设置以适应长时间连接;配置负载均衡器支持长连接。

总结

FastAPI的StreamingResponse为我们提供了强大的流式数据传输能力,适用于多种场景:

  1. 实时通信:SSE服务器发送事件,实现服务器向客户端的实时推送
  2. AI对话:实现类似ChatGPT的打字机效果,提升用户体验
  3. 文件传输:大文件的分块传输,避免内存溢出
  4. 日志监控:实时日志推送,便于系统监控
  5. 进度指示:长时间任务的实时进度更新

💡 核心要点:流式响应的关键在于异步生成器的使用,通过yield逐步返回数据,配合适当的错误处理和连接管理,可以构建出高效、可靠的实时数据传输系统。


SEO优化建议

为了提高这篇流式响应教程在搜索引擎中的排名,以下是几个关键的SEO优化建议:

标题优化

  • 主标题:使用包含核心关键词的标题,如"FastAPI流式响应StreamingResponse完全指南"
  • 二级标题:每个章节标题都包含相关的长尾关键词
  • H1-H6层次结构:保持正确的标题层级,便于搜索引擎理解内容结构

内容优化

  • 关键词密度:在内容中自然地融入关键词如"StreamingResponse", "流式响应", "SSE", "实时推送", "AI对话"等
  • 元描述:在文章开头的元数据中包含吸引人的描述
  • 内部链接:链接到其他相关教程,如WebSocket实时通信
  • 外部权威链接:引用官方文档和权威资源

技术SEO

  • 页面加载速度:优化代码块和图片加载
  • 移动端适配:确保在移动设备上良好显示
  • 结构化数据:使用适当的HTML标签和语义化元素

用户体验优化

  • 内容可读性:使用清晰的段落结构和代码示例
  • 互动元素:提供实际可运行的代码示例
  • 更新频率:定期更新内容以保持时效性

常见问题解答(FAQ)

Q1: StreamingResponse与WebSocket有什么区别?

A: StreamingResponse主要用于服务器向客户端的单向数据推送,基于HTTP协议;WebSocket支持双向通信,需要建立持久连接。StreamingResponse实现简单,适合服务器推送场景;WebSocket功能更强,适合需要双向通信的应用。

Q2: 如何处理大量并发的流式连接?

A: 需要限制最大并发连接数,使用连接池管理资源,优化内存使用,考虑使用消息队列分散负载,合理配置服务器参数以支持高并发。

Q3: 流式响应如何保证数据完整性?

A: 可以在数据流中加入序列号和校验信息,客户端进行数据验证,实现断线重连机制,使用心跳包检测连接状态。

Q4: 如何在前端正确处理SSE连接?

A: 使用EventSource API建立连接,监听message、error、open事件,实现自动重连机制,处理不同类型的SSE事件,正确关闭连接释放资源。

Q5: 流式响应在生产环境有哪些注意事项?

A: 配置反向代理禁用缓冲,调整超时设置,实现连接管理和监控,处理异常和错误情况,优化内存和CPU使用,确保安全性。


🔗 相关教程推荐

🏷️ 标签云: FastAPI流式响应 StreamingResponse SSE 实时推送 AI对话 服务器发送事件 数据流 实时通信 性能优化 错误处理