#FastAPI流式响应StreamingResponse完全指南
📂 所属阶段:第六阶段 — 2026 特色专题(AI 集成篇)
🔗 相关章节:WebSocket实时通信 · 异步编程深度解析
#目录
- 流式响应基础概念
- StreamingResponse核心实现
- SSE服务器发送事件
- AI对话流式输出
- 文件流传输
- 实时日志推送
- 性能优化策略
- 错误处理与异常
- 生产环境部署
- 前端集成方案
- 常见问题解答
- 总结
#流式响应基础概念
#什么是流式响应?
流式响应是一种特殊的HTTP响应模式,允许服务器逐步向客户端发送数据,而不是一次性发送完整响应。这种模式特别适用于:
传统响应模式:
┌─────────────────────────────────────────────────────┐
│ 客户端 → 请求 → 服务器 → 处理 → 完整响应 → 客户端 │
│ (等待所有数据处理完成) │
└─────────────────────────────────────────────────────┘
流式响应模式:
┌─────────────────────────────────────────────────────┐
│ 客户端 → 请求 → 服务器 → 边处理边发送 → 客户端 │
│ (实时接收数据片段) │
└─────────────────────────────────────────────────────┘#流式响应的核心优势
| 优势 | 说明 | 应用场景 |
|---|---|---|
| 实时性 | 数据处理完成后立即发送 | AI对话、实时日志 |
| 内存效率 | 避免在服务器端累积大量数据 | 大文件传输 |
| 用户体验 | 即时反馈,减少等待焦虑 | 打字机效果、进度显示 |
| 资源节约 | 按需传输,减少网络压力 | 视频流、音频流 |
#流式响应技术对比
| 技术 | 适用场景 | 复杂度 | 优势 |
|---|---|---|---|
| StreamingResponse | 简单流式数据 | 低 | 易实现、轻量级 |
| SSE | 服务器主动推送 | 中 | 标准化、浏览器原生支持 |
| WebSocket | 双向通信 | 高 | 全双工、低延迟 |
| Server-Sent Events | 单向推送 | 中 | 持久连接、自动重连 |
#典型应用场景
- AI对话系统:如ChatGPT、Claude等,实现打字机效果
- 文件下载:大文件分块传输,边生成边下载
- 实时日志:服务器日志实时推送
- 视频流:逐帧传输视频数据
- 进度指示:长时间任务的实时进度更新
- 股票价格:实时金融数据推送
- 聊天应用:实时消息推送
#StreamingResponse核心实现
#基础流式响应
让我们从最简单的流式响应开始:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
from typing import AsyncGenerator
app = FastAPI()
async def number_generator():
"""数字生成器 - 基础流式响应示例"""
for i in range(1, 11):
yield f"数据 {i}\n"
await asyncio.sleep(0.5) # 模拟处理时间
@app.get("/stream-numbers")
async def stream_numbers():
"""流式返回数字序列"""
return StreamingResponse(
number_generator(),
media_type="text/plain",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)#高级流式响应配置
async def advanced_stream_generator():
"""高级流式响应生成器"""
try:
for i in range(100):
# 模拟不同类型的数据
data = {
"id": i,
"timestamp": asyncio.get_event_loop().time(),
"data": f"Processing item {i}",
"status": "processing"
}
yield f"data: {json.dumps(data)}\n\n"
await asyncio.sleep(0.1)
# 模拟可能的错误
if i == 50:
error_data = {
"id": i,
"error": "Processing error occurred",
"status": "error"
}
yield f"data: {json.dumps(error_data)}\n\n"
except asyncio.CancelledError:
# 客户端断开连接时的清理工作
print("Client disconnected")
raise
except Exception as e:
# 处理异常情况
error_response = {
"error": str(e),
"status": "error"
}
yield f"data: {json.dumps(error_response)}\n\n"
finally:
# 完成信号
complete_data = {
"status": "completed",
"message": "Stream completed successfully"
}
yield f"data: {json.dumps(complete_data)}\n\n"
@app.get("/advanced-stream")
async def advanced_stream():
"""高级流式响应端点"""
return StreamingResponse(
advanced_stream_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用Nginx缓冲
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Cache-Control"
}
)#自定义流式响应类
from fastapi.responses import StreamingResponse
from typing import AsyncIterator, Callable, Any
class CustomStreamingResponse(StreamingResponse):
"""自定义流式响应类"""
def __init__(
self,
content: Any,
status_code: int = 200,
headers: dict = None,
media_type: str = None,
background=None,
custom_headers: dict = None
):
super().__init__(content, status_code, headers, media_type, background)
# 添加自定义头部
if custom_headers:
self.headers.update(custom_headers)
# 默认流式响应头部
self.headers.setdefault("Cache-Control", "no-cache")
self.headers.setdefault("Connection", "keep-alive")
self.headers.setdefault("X-Accel-Buffering", "no")
async def custom_stream_generator():
"""自定义流生成器"""
for i in range(10):
yield f"Custom data {i}\n"
await asyncio.sleep(0.2)
@app.get("/custom-stream")
async def custom_stream():
"""使用自定义流式响应"""
return CustomStreamingResponse(
custom_stream_generator(),
media_type="text/plain",
custom_headers={
"X-Stream-Id": "custom-stream-123",
"X-Data-Type": "numbers"
}
)#SSE服务器发送事件
#SSE基础实现
from datetime import datetime
import time
async def sse_generator():
"""SSE格式数据生成器"""
for i in range(10):
data = {
"id": i,
"timestamp": datetime.now().isoformat(),
"message": f"SSE message {i}",
"server_time": time.time()
}
# SSE格式:event + data
yield f"event: message\ndata: {json.dumps(data)}\n\n"
await asyncio.sleep(1)
@app.get("/sse-basic")
async def sse_basic():
"""基础SSE端点"""
return StreamingResponse(
sse_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)#高级SSE实现
class SSEEvent:
"""SSE事件类"""
def __init__(self, data=None, event_type=None, event_id=None, retry=None):
self.data = data
self.event_type = event_type
self.event_id = event_id
self.retry = retry
def to_sse_format(self):
"""转换为SSE格式"""
lines = []
if self.event_id is not None:
lines.append(f"id: {self.event_id}")
if self.event_type is not None:
lines.append(f"event: {self.event_type}")
if self.data is not None:
# 如果数据是字典,转换为JSON字符串
if isinstance(self.data, dict):
data_str = json.dumps(self.data, ensure_ascii=False)
else:
data_str = str(self.data)
# SSE要求数据行以冒号开头
for line in data_str.split('\n'):
lines.append(f"data: {line}")
if self.retry is not None:
lines.append(f"retry: {self.retry}")
# 以两个换行符结尾
return '\n'.join(lines) + '\n\n'
async def advanced_sse_generator():
"""高级SSE生成器"""
try:
# 发送重连间隔
yield "retry: 10000\n\n"
for i in range(20):
# 不同类型的事件
if i % 5 == 0:
# 心跳事件
heartbeat = SSEEvent(
data={"type": "heartbeat", "timestamp": datetime.now().isoformat()},
event_type="heartbeat",
event_id=str(i)
)
elif i % 3 == 0:
# 状态更新事件
status_update = SSEEvent(
data={
"type": "status",
"message": f"Processing batch {i//3 + 1}",
"progress": i * 5,
"total": 100
},
event_type="status",
event_id=str(i)
)
else:
# 普通数据事件
data_event = SSEEvent(
data={
"type": "data",
"id": i,
"content": f"Real-time data point {i}",
"timestamp": datetime.now().isoformat()
},
event_type="data",
event_id=str(i)
)
yield data_event.to_sse_format()
await asyncio.sleep(0.5)
except asyncio.CancelledError:
print("SSE connection cancelled by client")
raise
except Exception as e:
error_event = SSEEvent(
data={"type": "error", "message": str(e)},
event_type="error"
)
yield error_event.to_sse_format()
@app.get("/sse-advanced")
async def sse_advanced():
"""高级SSE端点"""
return StreamingResponse(
advanced_sse_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"Access-Control-Allow-Origin": "*"
}
)#SSE连接管理
from collections import defaultdict
import uuid
class SSEConnectionManager:
"""SSE连接管理器"""
def __init__(self):
self.connections = defaultdict(set)
self.active_streams = {}
async def connect(self, stream_id: str, send_func: Callable):
"""连接到SSE流"""
connection_id = str(uuid.uuid4())
self.connections[stream_id].add(connection_id)
self.active_streams[connection_id] = send_func
return connection_id
async def disconnect(self, connection_id: str, stream_id: str):
"""断开SSE连接"""
self.connections[stream_id].discard(connection_id)
if connection_id in self.active_streams:
del self.active_streams[connection_id]
def broadcast_to_stream(self, stream_id: str, event: SSEEvent):
"""向特定流广播事件"""
for connection_id in list(self.connections[stream_id]):
if connection_id in self.active_streams:
# 这里需要实际的发送逻辑
pass
def get_active_connections(self, stream_id: str):
"""获取活跃连接数"""
return len(self.connections[stream_id])
# 全局连接管理器实例
sse_manager = SSEConnectionManager()
async def managed_sse_generator(stream_id: str):
"""受管理的SSE生成器"""
connection_id = None
try:
# 模拟连接管理
connection_id = str(uuid.uuid4())
# 发送连接确认
yield f"data: {json.dumps({'type': 'connected', 'connection_id': connection_id, 'stream_id': stream_id})}\n\n"
for i in range(30):
data = {
"type": "stream_data",
"stream_id": stream_id,
"sequence": i,
"timestamp": datetime.now().isoformat(),
"active_connections": sse_manager.get_active_connections(stream_id)
}
yield f"data: {json.dumps(data)}\n\n"
await asyncio.sleep(1)
except asyncio.CancelledError:
print(f"SSE connection cancelled for stream {stream_id}, connection {connection_id}")
if connection_id and stream_id:
await sse_manager.disconnect(connection_id, stream_id)
raise
finally:
if connection_id and stream_id:
await sse_manager.disconnect(connection_id, stream_id)
@app.get("/sse-managed/{stream_id}")
async def sse_managed(stream_id: str):
"""受管理的SSE端点"""
return StreamingResponse(
managed_sse_generator(stream_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)#AI对话流式输出
#基础AI流式响应
async def ai_stream_generator(user_input: str):
"""AI流式响应生成器"""
# 模拟AI响应生成过程
ai_response = f"这是对您问题 '{user_input}' 的详细回答。让我为您逐步分析这个问题的各个方面。首先,我们需要考虑相关的因素,然后进行深入的探讨。经过综合分析,我认为最合适的解决方案是..."
# 模拟打字机效果
accumulated_text = ""
for i, char in enumerate(ai_response):
accumulated_text += char
chunk_data = {
"type": "token",
"content": char,
"accumulated": accumulated_text,
"index": i,
"is_final": i == len(ai_response) - 1
}
yield f"data: {json.dumps(chunk_data)}\n\n"
await asyncio.sleep(0.02) # 模拟AI生成速度
@app.post("/ai-stream")
async def ai_stream(request: Request):
"""AI流式对话端点"""
body = await request.json()
user_message = body.get("message", "")
return StreamingResponse(
ai_stream_generator(user_message),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)#集成真实AI服务
import openai
from typing import AsyncGenerator
async def openai_stream_generator(user_message: str, model: str = "gpt-4o-mini"):
"""OpenAI流式响应生成器"""
try:
# 使用httpx进行异步HTTP请求
async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
"Content-Type": "application/json",
},
json={
"model": model,
"messages": [{"role": "user", "content": user_message}],
"stream": True,
"temperature": 0.7,
},
follow_redirects=True
)
async for chunk in response.aiter_lines():
if chunk.startswith("data: "):
data = chunk[6:] # 移除 "data: " 前缀
if data.strip() == "[DONE]":
break
try:
parsed = json.loads(data)
if "choices" in parsed and len(parsed["choices"]) > 0:
delta = parsed["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
yield f"data: {json.dumps({'type': 'token', 'content': content})}\n\n"
except json.JSONDecodeError:
continue
except Exception as e:
error_data = {
"type": "error",
"message": f"AI service error: {str(e)}"
}
yield f"data: {json.dumps(error_data)}\n\n"
@app.post("/openai-stream")
async def openai_stream_endpoint(request: Request):
"""OpenAI流式对话端点"""
body = await request.json()
user_message = body.get("message", "")
return StreamingResponse(
openai_stream_generator(user_message),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)#AI流式响应优化
class AIStreamProcessor:
"""AI流式响应处理器"""
def __init__(self, max_tokens: int = 2000, timeout: int = 60):
self.max_tokens = max_tokens
self.timeout = timeout
self.token_count = 0
async def process_stream(self, generator_func) -> AsyncGenerator[str, None]:
"""处理AI流式响应"""
start_time = asyncio.get_event_loop().time()
try:
async for chunk in generator_func:
# 检查超时
if asyncio.get_event_loop().time() - start_time > self.timeout:
yield f"data: {json.dumps({'type': 'error', 'message': 'Request timeout'})}\n\n"
break
# 检查token限制
if self.token_count >= self.max_tokens:
yield f"data: {json.dumps({'type': 'warning', 'message': 'Max tokens reached'})}\n\n"
break
# 处理chunk
yield chunk
except asyncio.CancelledError:
yield f"data: {json.dumps({'type': 'info', 'message': 'Request cancelled by client'})}\n\n"
raise
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
finally:
# 发送完成信号
yield f"data: {json.dumps({'type': 'done', 'message': 'Stream completed'})}\n\n"
async def enhanced_ai_stream(user_input: str):
"""增强版AI流式响应"""
processor = AIStreamProcessor(max_tokens=1000, timeout=30)
async def mock_ai_generator():
"""模拟AI生成器"""
response_parts = [
"这是一个模拟的AI响应。",
"它展示了流式输出的效果。",
"每个部分都是逐步生成的。",
"用户可以实时看到内容的产生。"
]
accumulated = ""
for i, part in enumerate(response_parts):
for char in part:
accumulated += char
chunk_data = {
"type": "token",
"content": char,
"accumulated": accumulated,
"part_index": i,
"char_index": len(accumulated)
}
yield f"data: {json.dumps(chunk_data)}\n\n"
await asyncio.sleep(0.01)
async for chunk in processor.process_stream(mock_ai_generator()):
yield chunk
@app.post("/enhanced-ai-stream")
async def enhanced_ai_stream_endpoint(request: Request):
"""增强版AI流式端点"""
body = await request.json()
user_message = body.get("message", "")
return StreamingResponse(
enhanced_ai_stream(user_message),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)#文件流传输
#大文件流式下载
import aiofiles
from pathlib import Path
async def file_stream_generator(file_path: str, chunk_size: int = 8192):
"""文件流式传输生成器"""
file_path = Path(file_path)
if not file_path.exists():
yield f"data: {json.dumps({'error': 'File not found'})}\n\n"
return
file_size = file_path.stat().st_size
chunk_count = 0
try:
async with aiofiles.open(file_path, 'rb') as file:
bytes_sent = 0
while True:
chunk = await file.read(chunk_size)
if not chunk:
break
# 发送文件块
yield chunk
bytes_sent += len(chunk)
chunk_count += 1
# 发送进度更新(每10个块)
if chunk_count % 10 == 0:
progress_data = {
"type": "progress",
"bytes_sent": bytes_sent,
"total_size": file_size,
"percentage": round((bytes_sent / file_size) * 100, 2)
}
yield f"\ndata: {json.dumps(progress_data)}\n\n"
# 小幅延迟以避免过度占用资源
await asyncio.sleep(0.001)
except asyncio.CancelledError:
print("File download cancelled by client")
raise
except Exception as e:
error_data = {
"type": "error",
"message": f"File read error: {str(e)}"
}
yield f"data: {json.dumps(error_data)}\n\n"
@app.get("/download-file/{filename}")
async def download_file(filename: str):
"""文件流式下载端点"""
file_path = f"/path/to/files/{filename}" # 实际路径需要根据实际情况调整
return StreamingResponse(
file_stream_generator(file_path),
media_type="application/octet-stream",
headers={
"Content-Disposition": f"attachment; filename={filename}",
"Content-Length": str(Path(file_path).stat().st_size) if Path(file_path).exists() else "0",
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)#动态文件生成流
import csv
import io
from contextlib import asynccontextmanager
async def dynamic_csv_generator(data_source_func):
"""动态CSV文件生成器"""
# 创建内存中的CSV写入器
output = io.StringIO()
writer = None
try:
async for row_data in data_source_func():
if writer is None:
# 初始化CSV写入器(假设第一行是表头)
fieldnames = list(row_data.keys()) if isinstance(row_data, dict) else [f"column_{i}" for i in range(len(row_data))]
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
yield output.getvalue().encode('utf-8')
output.seek(0)
output.truncate(0)
# 写入数据行
if isinstance(row_data, dict):
writer.writerow(row_data)
else:
# 如果不是字典,转换为字典格式
row_dict = {f"column_{i}": val for i, val in enumerate(row_data)}
writer.writerow(row_dict)
chunk = output.getvalue()
if chunk:
yield chunk.encode('utf-8')
output.seek(0)
output.truncate(0)
# 小幅延迟以避免过度占用资源
await asyncio.sleep(0.001)
except Exception as e:
error_row = {"error": str(e)}
yield f"Error occurred: {str(e)}".encode('utf-8')
async def sample_data_source():
"""示例数据源"""
for i in range(100):
yield {
"id": i,
"name": f"Item {i}",
"value": i * 10,
"timestamp": datetime.now().isoformat()
}
await asyncio.sleep(0.01) # 模拟数据生成延迟
@app.get("/download-dynamic-csv")
async def download_dynamic_csv():
"""动态CSV文件下载"""
return StreamingResponse(
dynamic_csv_generator(sample_data_source),
media_type="text/csv",
headers={
"Content-Disposition": "attachment; filename=dynamic_data.csv",
"Cache-Control": "no-cache"
}
)#实时日志推送
#实时日志流
import logging
from logging.handlers import RotatingFileHandler
import queue
import threading
from concurrent.futures import ThreadPoolExecutor
class RealTimeLogStreamer:
"""实时日志流处理器"""
def __init__(self):
self.log_queue = asyncio.Queue()
self.clients = set()
self.logger = self._setup_logger()
def _setup_logger(self):
"""设置日志记录器"""
logger = logging.getLogger("realtime_logs")
logger.setLevel(logging.INFO)
# 创建内存处理器
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logger.addHandler(handler)
logger.propagate = False # 防止重复记录
return logger
async def log_generator(self):
"""日志流生成器"""
try:
while True:
try:
# 从队列获取日志记录
log_record = await asyncio.wait_for(self.log_queue.get(), timeout=1.0)
log_data = {
"timestamp": datetime.now().isoformat(),
"level": log_record.levelname,
"message": log_record.getMessage(),
"module": log_record.module,
"function": log_record.funcName,
"line": log_record.lineno
}
yield f"data: {json.dumps(log_data)}\n\n"
except asyncio.TimeoutError:
# 发送心跳以维持连接
heartbeat = {
"type": "heartbeat",
"timestamp": datetime.now().isoformat()
}
yield f"data: {json.dumps(heartbeat)}\n\n"
continue
except asyncio.CancelledError:
print("Log streaming cancelled")
raise
except Exception as e:
error_data = {
"type": "error",
"message": f"Log streaming error: {str(e)}"
}
yield f"data: {json.dumps(error_data)}\n\n"
def add_log(self, level: str, message: str):
"""添加日志到队列"""
record = logging.LogRecord(
name="realtime_logs",
level=getattr(logging, level.upper()),
pathname="",
lineno=0,
msg=message,
args=(),
exc_info=None
)
# 将记录放入队列(这里需要在异步上下文中处理)
pass
# 全局日志流实例
log_streamer = RealTimeLogStreamer()
@app.get("/realtime-logs")
async def realtime_logs():
"""实时日志流端点"""
return StreamingResponse(
log_streamer.log_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
@app.post("/log-message")
async def log_message(request: Request):
"""记录日志消息"""
body = await request.json()
level = body.get("level", "INFO")
message = body.get("message", "")
# 这里应该将日志添加到队列中
# log_streamer.add_log(level, message)
return {"status": "logged", "message": message}#系统监控流
import psutil
import time
async def system_monitor_stream():
"""系统监控流生成器"""
start_time = time.time()
try:
while True:
# 收集系统指标
metrics = {
"timestamp": datetime.now().isoformat(),
"uptime": time.time() - start_time,
"cpu_percent": psutil.cpu_percent(interval=1),
"memory_percent": psutil.virtual_memory().percent,
"disk_percent": psutil.disk_usage('/').percent,
"network_io": psutil.net_io_counters()._asdict(),
"process_count": len(psutil.pids()),
"load_average": psutil.getloadavg()
}
yield f"data: {json.dumps(metrics)}\n\n"
await asyncio.sleep(2) # 每2秒更新一次
except asyncio.CancelledError:
print("System monitoring cancelled")
raise
except Exception as e:
error_data = {
"type": "error",
"message": f"Monitoring error: {str(e)}"
}
yield f"data: {json.dumps(error_data)}\n\n"
@app.get("/system-monitor")
async def system_monitor():
"""系统监控流端点"""
return StreamingResponse(
system_monitor_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)#性能优化策略
#流式响应缓冲优化
class OptimizedStreamBuffer:
"""优化的流式响应缓冲区"""
def __init__(self, max_buffer_size: int = 65536, flush_interval: float = 0.1):
self.buffer = []
self.max_buffer_size = max_buffer_size
self.flush_interval = flush_interval
self.last_flush = asyncio.get_event_loop().time()
def add_chunk(self, chunk: str) -> list:
"""添加数据块到缓冲区,返回需要发送的块列表"""
self.buffer.append(chunk)
current_time = asyncio.get_event_loop().time()
# 检查是否需要刷新
should_flush = (
len(''.join(self.buffer)) >= self.max_buffer_size or
current_time - self.last_flush >= self.flush_interval
)
if should_flush:
chunks_to_send = self.buffer.copy()
self.buffer.clear()
self.last_flush = current_time
return chunks_to_send
return []
def force_flush(self) -> list:
"""强制刷新缓冲区"""
if self.buffer:
chunks_to_send = self.buffer.copy()
self.buffer.clear()
return chunks_to_send
return []
async def optimized_stream_generator():
"""使用优化缓冲区的流式生成器"""
buffer = OptimizedStreamBuffer(max_buffer_size=8192, flush_interval=0.05)
try:
for i in range(100):
chunk = f"data: {json.dumps({'id': i, 'message': f'Message {i}'})}\n\n"
# 添加到缓冲区
chunks_to_flush = buffer.add_chunk(chunk)
for flush_chunk in chunks_to_flush:
yield flush_chunk
await asyncio.sleep(0.01)
# 最后强制刷新
final_chunks = buffer.force_flush()
for chunk in final_chunks:
yield chunk
except asyncio.CancelledError:
# 发送剩余缓冲区内容
remaining_chunks = buffer.force_flush()
for chunk in remaining_chunks:
yield chunk
raise
@app.get("/optimized-stream")
async def optimized_stream():
"""优化的流式响应端点"""
return StreamingResponse(
optimized_stream_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)#并发流式响应管理
import weakref
from typing import Dict, Set
class StreamConcurrencyManager:
"""流式响应并发管理器"""
def __init__(self, max_concurrent_streams: int = 100):
self.max_concurrent_streams = max_concurrent_streams
self.active_streams: Set[str] = weakref.WeakSet()
self.stream_counter = 0
def register_stream(self, stream_id: str) -> bool:
"""注册新的流式响应"""
if len(self.active_streams) >= self.max_concurrent_streams:
return False
self.active_streams.add(stream_id)
self.stream_counter += 1
return True
def unregister_stream(self, stream_id: str):
"""注销流式响应"""
self.active_streams.discard(stream_id)
def get_stats(self):
"""获取统计信息"""
return {
"active_streams": len(self.active_streams),
"total_streams_served": self.stream_counter,
"max_allowed": self.max_concurrent_streams
}
# 全局并发管理器
concurrency_manager = StreamConcurrencyManager(max_concurrent_streams=50)
async def managed_stream_generator(client_id: str):
"""受管理的流式生成器"""
stream_id = f"stream_{client_id}_{uuid.uuid4()}"
if not concurrency_manager.register_stream(stream_id):
yield f"data: {json.dumps({'error': 'Too many concurrent streams'})}\n\n"
return
try:
for i in range(50):
data = {
"client_id": client_id,
"sequence": i,
"timestamp": datetime.now().isoformat(),
"concurrency_stats": concurrency_manager.get_stats()
}
yield f"data: {json.dumps(data)}\n\n"
await asyncio.sleep(0.1)
finally:
concurrency_manager.unregister_stream(stream_id)
@app.get("/managed-stream/{client_id}")
async def managed_stream(client_id: str):
"""受管理的流式响应端点"""
return StreamingResponse(
managed_stream_generator(client_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)#错误处理与异常
#全面的错误处理
from enum import Enum
class StreamErrorType(Enum):
"""流式响应错误类型"""
CONNECTION_TIMEOUT = "connection_timeout"
SERVER_ERROR = "server_error"
CLIENT_DISCONNECTED = "client_disconnected"
RESOURCE_EXHAUSTED = "resource_exhausted"
VALIDATION_ERROR = "validation_error"
class StreamErrorHandler:
"""流式响应错误处理器"""
@staticmethod
async def handle_stream_exception(exc: Exception, context: str = "") -> str:
"""处理流式响应异常"""
error_info = {
"type": "error",
"error_type": type(exc).__name__,
"message": str(exc),
"context": context,
"timestamp": datetime.now().isoformat()
}
if isinstance(exc, asyncio.TimeoutError):
error_info["error_type_enum"] = StreamErrorType.CONNECTION_TIMEOUT.value
elif isinstance(exc, asyncio.CancelledError):
error_info["error_type_enum"] = StreamErrorType.CLIENT_DISCONNECTED.value
else:
error_info["error_type_enum"] = StreamErrorType.SERVER_ERROR.value
return f"data: {json.dumps(error_info)}\n\n"
@staticmethod
def create_error_event(error_type: StreamErrorType, message: str, **kwargs) -> str:
"""创建错误事件"""
error_data = {
"type": "error",
"error_type": error_type.value,
"message": message,
"timestamp": datetime.now().isoformat(),
**kwargs
}
return f"data: {json.dumps(error_data)}\n\n"
async def robust_stream_generator():
"""健壮的流式生成器"""
error_handler = StreamErrorHandler()
try:
for i in range(20):
try:
# 模拟可能出错的操作
if i == 10:
# 模拟一个错误
raise ValueError(f"Simulated error at iteration {i}")
data = {
"id": i,
"status": "success",
"message": f"Processing item {i}"
}
yield f"data: {json.dumps(data)}\n\n"
await asyncio.sleep(0.2)
except ValueError as ve:
error_event = error_handler.create_error_event(
StreamErrorType.VALIDATION_ERROR,
str(ve),
iteration=i
)
yield error_event
# 继续处理后续项目
continue
except Exception as e:
error_event = await error_handler.handle_stream_exception(
e,
f"Error processing iteration {i}"
)
yield error_event
break
except asyncio.CancelledError:
# 客户端断开连接
cancel_event = error_handler.create_error_event(
StreamErrorType.CLIENT_DISCONNECTED,
"Client disconnected from stream"
)
yield cancel_event
raise
except Exception as e:
# 严重错误
fatal_event = await error_handler.handle_stream_exception(
e,
"Fatal error in stream generator"
)
yield fatal_event
@app.get("/robust-stream")
async def robust_stream():
"""健壮的流式响应端点"""
return StreamingResponse(
robust_stream_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)#生产环境部署
#Nginx配置优化
# nginx-sse.conf - SSE优化配置
server {
listen 80;
server_name your-domain.com;
# SSE特定配置
location /stream {
proxy_pass http://gunicorn_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_cache_bypass $http_upgrade;
# 关键:禁用Nginx缓冲以支持SSE
proxy_buffering off;
proxy_cache off;
proxy_set_header X-Accel-Buffering no;
# 超时配置
proxy_connect_timeout 60s;
proxy_send_timeout 300s;
proxy_read_timeout 300s;
# 长连接
keepalive_timeout 300s;
}
# 一般API配置
location / {
proxy_pass http://gunicorn_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 保持默认缓冲用于普通请求
proxy_buffering on;
}
}#Gunicorn配置
# gunicorn_streaming_config.py - 流式响应Gunicorn配置
import multiprocessing
# 基础配置
bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
# 流式响应特定配置
timeout = 300 # 为流式响应增加超时时间
graceful_timeout = 30
keepalive = 5
# 性能优化
preload_app = True
max_requests = 1000
max_requests_jitter = 100
# 日志配置
accesslog = "/var/log/gunicorn/stream_access.log"
errorlog = "/var/log/gunicorn/stream_error.log"
loglevel = "info"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'
# 内存优化
worker_tmp_dir = "/dev/shm" # 使用内存临时目录#前端集成方案
#JavaScript SSE客户端
// sse-client.js - SSE客户端实现
class SSEClient {
constructor(url, options = {}) {
this.url = url;
this.options = {
withCredentials: false,
headers: {},
onOpen: null,
onMessage: null,
onError: null,
onClose: null,
reconnectInterval: 3000,
maxReconnectAttempts: 10,
...options
};
this.eventSource = null;
this.reconnectAttempts = 0;
this.isConnected = false;
}
connect() {
try {
this.eventSource = new EventSource(this.url, {
withCredentials: this.options.withCredentials
});
// 添加自定义头部(注意:EventSource不直接支持自定义头部)
Object.entries(this.options.headers).forEach(([key, value]) => {
// 对于需要认证的场景,通常将token放在URL参数中
});
this.eventSource.onopen = (event) => {
this.isConnected = true;
this.reconnectAttempts = 0;
console.log('SSE connection opened');
if (this.options.onOpen) {
this.options.onOpen(event);
}
};
this.eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (this.options.onMessage) {
this.options.onMessage(data, event);
}
} catch (e) {
console.error('Error parsing SSE message:', e);
if (this.options.onMessage) {
this.options.onMessage(event.data, event);
}
}
};
this.eventSource.onerror = (event) => {
console.error('SSE error:', event);
this.isConnected = false;
if (this.options.onError) {
this.options.onError(event);
}
// 自动重连
if (this.reconnectAttempts < this.options.maxReconnectAttempts) {
setTimeout(() => {
this.reconnectAttempts++;
this.connect();
}, this.options.reconnectInterval);
} else {
if (this.options.onClose) {
this.options.onClose();
}
}
};
} catch (error) {
console.error('Failed to create SSE connection:', error);
if (this.options.onError) {
this.options.onError(error);
}
}
}
disconnect() {
if (this.eventSource) {
this.eventSource.close();
this.isConnected = false;
}
}
sendMessage(message) {
// SSE是单向的,客户端不能直接发送消息
// 需要使用其他方式(如普通HTTP请求)来发送数据
fetch(this.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(message)
});
}
}
// 使用示例
const sseClient = new SSEClient('/api/realtime-updates', {
onOpen: (event) => {
console.log('Connected to real-time updates');
},
onMessage: (data, event) => {
console.log('Received update:', data);
// 更新UI
updateUI(data);
},
onError: (error) => {
console.error('SSE Error:', error);
}
});
function updateUI(data) {
const output = document.getElementById('output');
const div = document.createElement('div');
div.textContent = `[${new Date().toLocaleTimeString()}] ${JSON.stringify(data)}`;
output.appendChild(div);
}
// 启动连接
sseClient.connect();#AI聊天界面集成
// ai-chat-integration.js - AI聊天界面集成
class AIChatInterface {
constructor(endpoint, outputElement) {
this.endpoint = endpoint;
this.outputElement = outputElement;
this.isStreaming = false;
this.currentMessage = '';
}
async sendMessage(message) {
if (this.isStreaming) {
throw new Error('Already streaming a response');
}
// 显示用户消息
this.displayMessage(message, 'user');
// 显示加载状态
const loadingElement = this.createLoadingElement();
this.outputElement.appendChild(loadingElement);
this.isStreaming = true;
try {
const response = await fetch(this.endpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ message })
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
// 移除加载状态
if (loadingElement.parentNode) {
loadingElement.remove();
}
// 创建AI消息容器
const aiMessageElement = this.createAIMessageElement();
this.outputElement.appendChild(aiMessageElement);
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
const chunk = decoder.decode(value, { stream: true });
// 解析SSE格式的数据
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const dataStr = line.substring(6); // 移除 "data: " 前缀
if (dataStr.trim()) {
try {
const data = JSON.parse(dataStr);
if (data.type === 'token' && data.content) {
this.currentMessage += data.content;
aiMessageElement.innerHTML = this.escapeHtml(this.currentMessage);
// 滚动到底部
this.outputElement.scrollTop = this.outputElement.scrollHeight;
} else if (data.type === 'error') {
aiMessageElement.innerHTML = `<span class="error">${this.escapeHtml(data.message)}</span>`;
break;
} else if (data.type === 'done') {
// 完成处理
break;
}
} catch (e) {
console.error('Error parsing SSE data:', e);
}
}
}
}
}
} catch (error) {
console.error('Streaming error:', error);
const errorElement = document.createElement('div');
errorElement.className = 'message error';
errorElement.textContent = `Error: ${error.message}`;
this.outputElement.appendChild(errorElement);
} finally {
this.isStreaming = false;
}
}
displayMessage(content, sender) {
const messageElement = document.createElement('div');
messageElement.className = `message ${sender}`;
messageElement.textContent = content;
this.outputElement.appendChild(messageElement);
this.outputElement.scrollTop = this.outputElement.scrollHeight;
}
createLoadingElement() {
const element = document.createElement('div');
element.className = 'message ai loading';
element.innerHTML = '<div class="loading-dots">●●●</div>';
return element;
}
createAIMessageElement() {
const element = document.createElement('div');
element.className = 'message ai';
element.style.whiteSpace = 'pre-wrap';
return element;
}
escapeHtml(text) {
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
}
// 使用示例
const chatInterface = new AIChatInterface('/ai-stream', document.getElementById('chat-output'));
document.getElementById('send-button').addEventListener('click', async () => {
const input = document.getElementById('message-input');
const message = input.value.trim();
if (message) {
input.value = '';
await chatInterface.sendMessage(message);
}
});
// 回车发送
document.getElementById('message-input').addEventListener('keypress', (e) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
document.getElementById('send-button').click();
}
});#常见问题解答
#Q1: 如何处理客户端断开连接?
A: 在流式响应生成器中捕获asyncio.CancelledError异常,这表示客户端已断开连接。在except块中进行必要的清理工作,如关闭数据库连接、释放资源等。
#Q2: SSE连接如何实现自动重连?
A: 客户端可以通过监听onerror事件来检测连接断开,并在一定延迟后重新建立连接。服务端可以在SSE流中发送retry: <milliseconds>指令来建议重连间隔。
#Q3: 如何优化流式响应的性能?
A: 可以通过以下方式优化:1)使用缓冲区批量发送数据;2)合理设置超时时间;3)限制并发连接数;4)优化数据序列化过程;5)使用连接池管理数据库连接。
#Q4: 流式响应如何处理错误?
A: 在生成器中使用try-catch块捕获异常,将错误信息格式化为SSE事件发送给客户端,然后可以选择继续流式传输或终止连接。
#Q5: 如何在生产环境中部署流式响应?
A: 需要特别注意反向代理(如Nginx)的配置,禁用缓冲以支持实时数据传输;调整超时设置以适应长时间连接;配置负载均衡器支持长连接。
#总结
FastAPI的StreamingResponse为我们提供了强大的流式数据传输能力,适用于多种场景:
- 实时通信:SSE服务器发送事件,实现服务器向客户端的实时推送
- AI对话:实现类似ChatGPT的打字机效果,提升用户体验
- 文件传输:大文件的分块传输,避免内存溢出
- 日志监控:实时日志推送,便于系统监控
- 进度指示:长时间任务的实时进度更新
💡 核心要点:流式响应的关键在于异步生成器的使用,通过
yield逐步返回数据,配合适当的错误处理和连接管理,可以构建出高效、可靠的实时数据传输系统。
#SEO优化建议
为了提高这篇流式响应教程在搜索引擎中的排名,以下是几个关键的SEO优化建议:
#标题优化
- 主标题:使用包含核心关键词的标题,如"FastAPI流式响应StreamingResponse完全指南"
- 二级标题:每个章节标题都包含相关的长尾关键词
- H1-H6层次结构:保持正确的标题层级,便于搜索引擎理解内容结构
#内容优化
- 关键词密度:在内容中自然地融入关键词如"StreamingResponse", "流式响应", "SSE", "实时推送", "AI对话"等
- 元描述:在文章开头的元数据中包含吸引人的描述
- 内部链接:链接到其他相关教程,如WebSocket实时通信等
- 外部权威链接:引用官方文档和权威资源
#技术SEO
- 页面加载速度:优化代码块和图片加载
- 移动端适配:确保在移动设备上良好显示
- 结构化数据:使用适当的HTML标签和语义化元素
#用户体验优化
- 内容可读性:使用清晰的段落结构和代码示例
- 互动元素:提供实际可运行的代码示例
- 更新频率:定期更新内容以保持时效性
#常见问题解答(FAQ)
#Q1: StreamingResponse与WebSocket有什么区别?
A: StreamingResponse主要用于服务器向客户端的单向数据推送,基于HTTP协议;WebSocket支持双向通信,需要建立持久连接。StreamingResponse实现简单,适合服务器推送场景;WebSocket功能更强,适合需要双向通信的应用。
#Q2: 如何处理大量并发的流式连接?
A: 需要限制最大并发连接数,使用连接池管理资源,优化内存使用,考虑使用消息队列分散负载,合理配置服务器参数以支持高并发。
#Q3: 流式响应如何保证数据完整性?
A: 可以在数据流中加入序列号和校验信息,客户端进行数据验证,实现断线重连机制,使用心跳包检测连接状态。
#Q4: 如何在前端正确处理SSE连接?
A: 使用EventSource API建立连接,监听message、error、open事件,实现自动重连机制,处理不同类型的SSE事件,正确关闭连接释放资源。
#Q5: 流式响应在生产环境有哪些注意事项?
A: 配置反向代理禁用缓冲,调整超时设置,实现连接管理和监控,处理异常和错误情况,优化内存和CPU使用,确保安全性。
🔗 相关教程推荐
- WebSocket实时通信 - 双向实时通信技术
- 异步编程深度解析 - 异步编程基础
- Nginx与Gunicorn生产部署 - 生产环境部署
- Docker容器化部署 - 容器化部署策略
- Pydantic Settings多环境配置 - 环境配置管理
🏷️ 标签云: FastAPI流式响应 StreamingResponse SSE 实时推送 AI对话 服务器发送事件 数据流 实时通信 性能优化 错误处理

