#WebSocket 实时通信:构建 AI 实时聊天室
📂 所属阶段:第六阶段 — 2026 特色专题(AI 集成篇)
🔗 相关章节:流式响应 StreamingResponse · OAuth2 与 JWT 鉴权
#1. WebSocket vs HTTP
#1.1 核心区别
| HTTP | WebSocket | |
|---|---|---|
| 连接方式 | 请求-响应(短连接) | 双向持久连接 |
| 谁先发消息 | 只能是客户端 | 双方都可以 |
| 实时性 | 轮询/长轮询(延迟高) | 真正的实时推送 |
| 适用场景 | REST API、文件上传 | 聊天、游戏、协作 |
HTTP: 客户端 → [请求] → 服务器 → [响应] → 断开 ❌
WebSocket:
客户端 ←→ [持久连接,双向通信] ↔ 服务器 ✅#1.2 HTTP 升级为 WebSocket
客户端发起 HTTP 请求(带 Upgrade 头):
GET /ws HTTP/1.1
Upgrade: websocket
Connection: Upgrade
服务器同意升级:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: upgrade#2. FastAPI WebSocket 基础
#2.1 基础 WebSocket 端点
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import asyncio
app = FastAPI()
# HTML 页面(用于测试)
html = """
<!DOCTYPE html>
<html>
<head><title>WebSocket 测试</title></head>
<body>
<h1>WebSocket 客户端</h1>
<input id="msg" placeholder="输入消息">
<button onclick="send()">发送</button>
<ul id="messages"></ul>
<script>
const ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = (e) => {
const li = document.createElement("li");
li.textContent = e.data;
document.getElementById("messages").appendChild(li);
};
function send() {
ws.send(document.getElementById("msg").value);
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""最简单的 WebSocket 端点"""
await websocket.accept()
try:
while True:
# 接收客户端消息
data = await websocket.receive_text()
print(f"收到: {data}")
# 发送回复
await websocket.send_text(f"服务器收到: {data}")
except WebSocketDisconnect:
print("客户端断开了")#2.2 运行测试
uvicorn main:app --reload
# 打开浏览器访问 http://localhost:8000#3. 连接管理器(多人聊天室核心)
#3.1 ConnectionManager
# managers/connection_manager.py
from fastapi import WebSocket
from typing import Dict, List
import json
from datetime import datetime
class ConnectionManager:
"""管理所有 WebSocket 连接,支持广播、群聊、私聊"""
def __init__(self):
# 在线用户:user_id → WebSocket
self.active_connections: Dict[str, WebSocket] = {}
# 用户信息:user_id → {name, avatar, room}
self.user_info: Dict[str, dict] = {}
async def connect(self, websocket: WebSocket, user_id: str, user_data: dict):
"""用户连接"""
await websocket.accept()
self.active_connections[user_id] = websocket
self.user_info[user_id] = {**user_data, "joined_at": datetime.utcnow().isoformat()}
# 广播:用户上线
await self.broadcast_system(f"用户 {user_data.get('name', user_id)} 上线了")
async def disconnect(self, user_id: str):
"""用户断开"""
if user_id in self.active_connections:
del self.active_connections[user_id]
user_name = self.user_info.get(user_id, {}).get("name", user_id)
if user_id in self.user_info:
del self.user_info[user_id]
await self.broadcast_system(f"用户 {user_name} 离开了")
async def send_personal(self, user_id: str, message: dict):
"""发送私信"""
if user_id in self.active_connections:
await self.active_connections[user_id].send_json(message)
async def broadcast(self, message: dict, exclude: list[str] = None):
"""广播消息(可排除指定用户)"""
disconnected = []
for user_id, ws in self.active_connections.items():
if exclude and user_id in exclude:
continue
try:
await ws.send_json(message)
except Exception:
disconnected.append(user_id)
# 清理断开的连接
for uid in disconnected:
await self.disconnect(uid)
async def broadcast_system(self, text: str):
"""广播系统消息"""
await self.broadcast({
"type": "system",
"content": text,
"timestamp": datetime.utcnow().isoformat(),
})
# 全局管理器实例
manager = ConnectionManager()#4. 完整聊天室实现
#4.1 WebSocket 路由
# routers/websocket_chat.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, Query
from fastapi.responses import HTMLResponse
from managers.connection_manager import manager
from auth.jwt import decode_token
from datetime import datetime
import json
router = APIRouter()
@router.get("/chat-room")
async def get_chat_page():
"""聊天室页面"""
return HTMLResponse(open("templates/chat.html").read())
@router.websocket("/ws/chat")
async def chat_websocket(
websocket: WebSocket,
token: str = Query(...), # URL 参数传递 token
):
"""
聊天 WebSocket 端点
客户端连接:new WebSocket("ws://localhost:8000/ws/chat?token=xxx")
"""
# 1. 验证用户身份
try:
payload = decode_token(token)
user_id = str(payload["sub"])
user_name = payload.get("email", user_id)
except Exception:
await websocket.close(code=4001, reason="认证失败")
return
# 2. 建立连接
await manager.connect(websocket, user_id, {"name": user_name})
try:
while True:
# 3. 接收消息
raw_data = await websocket.receive_text()
message = json.loads(raw_data)
msg_type = message.get("type", "text")
if msg_type == "text":
# 文本消息:广播给所有人
chat_msg = {
"type": "message",
"user_id": user_id,
"user_name": user_name,
"content": message["content"],
"timestamp": datetime.utcnow().isoformat(),
}
await manager.broadcast(chat_msg)
elif msg_type == "private":
# 私聊消息
target_user = message["to"]
private_msg = {
"type": "private",
"from": user_id,
"from_name": user_name,
"content": message["content"],
"timestamp": datetime.utcnow().isoformat(),
}
await manager.send_personal(target_user, private_msg)
# 发送给自己确认
await manager.send_personal(user_id, {
**private_msg, "to": target_user, "to_name": "你"
})
elif msg_type == "ping":
# 心跳:回复 pong
await manager.send_personal(user_id, {"type": "pong"})
except WebSocketDisconnect:
await manager.disconnect(user_id)
@router.get("/online-users")
async def get_online_users():
"""查询当前在线用户"""
return {
"count": len(manager.active_connections),
"users": [
{"user_id": uid, **info}
for uid, info in manager.user_info.items()
]
}#4.2 前端聊天室
<!-- templates/chat.html -->
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>AI 实时聊天室</title>
<style>
body { max-width: 600px; margin: 0 auto; padding: 20px; font-family: sans-serif; }
#messages { list-style: none; padding: 0; height: 400px; overflow-y: auto; border: 1px solid #ccc; border-radius: 8px; padding: 10px; }
#messages li { padding: 8px; margin: 4px 0; border-radius: 4px; }
#messages li.system { background: #f0f0f0; color: #666; font-size: 0.9em; text-align: center; }
#messages li.mine { background: #dcf8c6; margin-left: 20%; }
#messages li.others { background: #fff; border: 1px solid #ddd; margin-right: 20%; }
.meta { font-size: 0.75em; color: #888; margin-top: 2px; }
.input-area { display: flex; gap: 8px; margin-top: 10px; }
.input-area input { flex: 1; padding: 10px; border-radius: 20px; border: 1px solid #ccc; }
.input-area button { padding: 10px 20px; border-radius: 20px; border: none; background: #34b7f1; color: white; cursor: pointer; }
</style>
</head>
<body>
<h2>💬 AI 聊天室</h2>
<div id="status">连接中...</div>
<ul id="messages"></ul>
<div class="input-area">
<input id="msg" placeholder="输入消息,按回车发送" autocomplete="off">
<button onclick="send()">发送</button>
</div>
<script>
const token = "你的JWT令牌"; // 实际使用时从后端获取
const ws = new WebSocket(`ws://localhost:8000/ws/chat?token=${token}`);
const messagesEl = document.getElementById("messages");
const statusEl = document.getElementById("status");
ws.onopen = () => {
statusEl.textContent = "🟢 已连接";
statusEl.style.color = "green";
// 启动心跳
setInterval(() => ws.send(JSON.stringify({type: "ping"})), 30000);
};
ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
if (msg.type === "system") {
addMessage(msg.content, "system");
} else {
addMessage(`${msg.user_name}: ${msg.content}`, msg.type === "private" ? "mine" : "others");
}
};
ws.onclose = () => {
statusEl.textContent = "🔴 连接已断开";
statusEl.style.color = "red";
};
function addMessage(text, type) {
const li = document.createElement("li");
li.textContent = text;
li.className = type;
messagesEl.appendChild(li);
messagesEl.scrollTop = messagesEl.scrollHeight;
}
function send() {
const input = document.getElementById("msg");
if (!input.value.trim()) return;
ws.send(JSON.stringify({type: "text", content: input.value}));
input.value = "";
}
document.getElementById("msg").addEventListener("keypress", (e) => {
if (e.key === "Enter") send();
});
</script>
</body>
</html>#5. AI 实时助手(WebSocket + AI)
#5.1 AI 对话 WebSocket
# routers/websocket_ai.py
@router.websocket("/ws/ai")
async def ai_chat_websocket(websocket: WebSocket):
"""AI 实时对话 WebSocket"""
await websocket.accept()
openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
try:
# 接收初始化消息(包含对话历史)
init = await websocket.receive_json()
messages = init.get("history", [])
while True:
# 1. 接收用户消息
user_msg = await websocket.receive_json()
user_content = user_msg["content"]
# 2. 添加用户消息到历史
messages.append({"role": "user", "content": user_content})
# 3. 流式调用 AI
stream_resp = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True,
)
# 4. 流式转发 AI 回复
full_response = ""
for chunk in stream_resp:
token = chunk.choices[0].delta.content
if token:
full_response += token
# 逐 token 发送给前端
await websocket.send_json({
"type": "token",
"content": token,
})
# 5. 保存 AI 回复到历史
messages.append({"role": "assistant", "content": full_response})
# 6. 发送完成信号
await websocket.send_json({"type": "done"})
except WebSocketDisconnect:
print("AI 聊天会话结束")
except Exception as e:
await websocket.send_json({"type": "error", "message": str(e)})
await websocket.close()#6. 生产注意事项
⚠️ WebSocket 生产环境注意事项:
1. Nginx WebSocket 支持(必须配置)
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
2. 心跳保活
→ 客户端每 30 秒发送一次 ping
→ 服务端定期检测超时未响应的连接
3. Redis Pub/Sub 集群
→ 多 Worker 进程间共享连接状态
→ 使用 redis.asyncio 管理(aioredis)
4. 消息持久化
→ 将聊天记录存入数据库
→ 支持历史消息查询
5. 连接数限制
→ 每个用户一个连接
→ 防止滥用(IP 限流)#7. 小结
# WebSocket 速查
# 服务端
@app.websocket("/ws")
async def chat(websocket: WebSocket):
await websocket.accept()
await websocket.send_text("Hello")
data = await websocket.receive_text()
await websocket.close()
# 客户端
const ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = (e) => console.log(e.data);
ws.send("Hello");
# 工具库
# fastapi-WebSocket-Manager: github.com/testdrivenai/fastapi-websocket-manager
# websockets: asyncio 原生 WebSocket 库💡 记住:WebSocket 是持久连接,与无状态的 HTTP 有本质不同。需要额外的连接管理、鉴权传递(通常用 URL 参数或首次消息携带 token)和心跳机制。
🔗 扩展阅读

