WebSocket 实时通信:构建 AI 实时聊天室

📂 所属阶段:第六阶段 — 2026 特色专题(AI 集成篇)
🔗 相关章节:流式响应 StreamingResponse · OAuth2 与 JWT 鉴权


1. WebSocket vs HTTP

1.1 核心区别

HTTPWebSocket
连接方式请求-响应(短连接)双向持久连接
谁先发消息只能是客户端双方都可以
实时性轮询/长轮询(延迟高)真正的实时推送
适用场景REST API、文件上传聊天、游戏、协作
HTTP:  客户端 → [请求] → 服务器 → [响应] → 断开 ❌

WebSocket:
客户端 ←→ [持久连接,双向通信] ↔ 服务器 ✅

1.2 HTTP 升级为 WebSocket

客户端发起 HTTP 请求(带 Upgrade 头):
GET /ws HTTP/1.1
Upgrade: websocket
Connection: Upgrade

服务器同意升级:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: upgrade

2. FastAPI WebSocket 基础

2.1 基础 WebSocket 端点

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import asyncio

app = FastAPI()

# HTML 页面(用于测试)
html = """
<!DOCTYPE html>
<html>
<head><title>WebSocket 测试</title></head>
<body>
  <h1>WebSocket 客户端</h1>
  <input id="msg" placeholder="输入消息">
  <button onclick="send()">发送</button>
  <ul id="messages"></ul>
  <script>
    const ws = new WebSocket("ws://localhost:8000/ws");
    ws.onmessage = (e) => {
      const li = document.createElement("li");
      li.textContent = e.data;
      document.getElementById("messages").appendChild(li);
    };
    function send() {
      ws.send(document.getElementById("msg").value);
    }
  </script>
</body>
</html>
"""

@app.get("/")
async def get():
    return HTMLResponse(html)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """最简单的 WebSocket 端点"""
    await websocket.accept()

    try:
        while True:
            # 接收客户端消息
            data = await websocket.receive_text()
            print(f"收到: {data}")

            # 发送回复
            await websocket.send_text(f"服务器收到: {data}")
    except WebSocketDisconnect:
        print("客户端断开了")

2.2 运行测试

uvicorn main:app --reload
# 打开浏览器访问 http://localhost:8000

3. 连接管理器(多人聊天室核心)

3.1 ConnectionManager

# managers/connection_manager.py
from fastapi import WebSocket
from typing import Dict, List
import json
from datetime import datetime

class ConnectionManager:
    """管理所有 WebSocket 连接,支持广播、群聊、私聊"""

    def __init__(self):
        # 在线用户:user_id → WebSocket
        self.active_connections: Dict[str, WebSocket] = {}
        # 用户信息:user_id → {name, avatar, room}
        self.user_info: Dict[str, dict] = {}

    async def connect(self, websocket: WebSocket, user_id: str, user_data: dict):
        """用户连接"""
        await websocket.accept()
        self.active_connections[user_id] = websocket
        self.user_info[user_id] = {**user_data, "joined_at": datetime.utcnow().isoformat()}
        # 广播:用户上线
        await self.broadcast_system(f"用户 {user_data.get('name', user_id)} 上线了")

    async def disconnect(self, user_id: str):
        """用户断开"""
        if user_id in self.active_connections:
            del self.active_connections[user_id]
        user_name = self.user_info.get(user_id, {}).get("name", user_id)
        if user_id in self.user_info:
            del self.user_info[user_id]
        await self.broadcast_system(f"用户 {user_name} 离开了")

    async def send_personal(self, user_id: str, message: dict):
        """发送私信"""
        if user_id in self.active_connections:
            await self.active_connections[user_id].send_json(message)

    async def broadcast(self, message: dict, exclude: list[str] = None):
        """广播消息(可排除指定用户)"""
        disconnected = []
        for user_id, ws in self.active_connections.items():
            if exclude and user_id in exclude:
                continue
            try:
                await ws.send_json(message)
            except Exception:
                disconnected.append(user_id)

        # 清理断开的连接
        for uid in disconnected:
            await self.disconnect(uid)

    async def broadcast_system(self, text: str):
        """广播系统消息"""
        await self.broadcast({
            "type": "system",
            "content": text,
            "timestamp": datetime.utcnow().isoformat(),
        })

# 全局管理器实例
manager = ConnectionManager()

4. 完整聊天室实现

4.1 WebSocket 路由

# routers/websocket_chat.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, Query
from fastapi.responses import HTMLResponse
from managers.connection_manager import manager
from auth.jwt import decode_token
from datetime import datetime
import json

router = APIRouter()

@router.get("/chat-room")
async def get_chat_page():
    """聊天室页面"""
    return HTMLResponse(open("templates/chat.html").read())


@router.websocket("/ws/chat")
async def chat_websocket(
    websocket: WebSocket,
    token: str = Query(...),  # URL 参数传递 token
):
    """
    聊天 WebSocket 端点
    客户端连接:new WebSocket("ws://localhost:8000/ws/chat?token=xxx")
    """
    # 1. 验证用户身份
    try:
        payload = decode_token(token)
        user_id = str(payload["sub"])
        user_name = payload.get("email", user_id)
    except Exception:
        await websocket.close(code=4001, reason="认证失败")
        return

    # 2. 建立连接
    await manager.connect(websocket, user_id, {"name": user_name})

    try:
        while True:
            # 3. 接收消息
            raw_data = await websocket.receive_text()
            message = json.loads(raw_data)
            msg_type = message.get("type", "text")

            if msg_type == "text":
                # 文本消息:广播给所有人
                chat_msg = {
                    "type": "message",
                    "user_id": user_id,
                    "user_name": user_name,
                    "content": message["content"],
                    "timestamp": datetime.utcnow().isoformat(),
                }
                await manager.broadcast(chat_msg)

            elif msg_type == "private":
                # 私聊消息
                target_user = message["to"]
                private_msg = {
                    "type": "private",
                    "from": user_id,
                    "from_name": user_name,
                    "content": message["content"],
                    "timestamp": datetime.utcnow().isoformat(),
                }
                await manager.send_personal(target_user, private_msg)
                # 发送给自己确认
                await manager.send_personal(user_id, {
                    **private_msg, "to": target_user, "to_name": "你"
                })

            elif msg_type == "ping":
                # 心跳:回复 pong
                await manager.send_personal(user_id, {"type": "pong"})

    except WebSocketDisconnect:
        await manager.disconnect(user_id)


@router.get("/online-users")
async def get_online_users():
    """查询当前在线用户"""
    return {
        "count": len(manager.active_connections),
        "users": [
            {"user_id": uid, **info}
            for uid, info in manager.user_info.items()
        ]
    }

4.2 前端聊天室

<!-- templates/chat.html -->
<!DOCTYPE html>
<html lang="zh">
<head>
  <meta charset="UTF-8">
  <title>AI 实时聊天室</title>
  <style>
    body { max-width: 600px; margin: 0 auto; padding: 20px; font-family: sans-serif; }
    #messages { list-style: none; padding: 0; height: 400px; overflow-y: auto; border: 1px solid #ccc; border-radius: 8px; padding: 10px; }
    #messages li { padding: 8px; margin: 4px 0; border-radius: 4px; }
    #messages li.system { background: #f0f0f0; color: #666; font-size: 0.9em; text-align: center; }
    #messages li.mine { background: #dcf8c6; margin-left: 20%; }
    #messages li.others { background: #fff; border: 1px solid #ddd; margin-right: 20%; }
    .meta { font-size: 0.75em; color: #888; margin-top: 2px; }
    .input-area { display: flex; gap: 8px; margin-top: 10px; }
    .input-area input { flex: 1; padding: 10px; border-radius: 20px; border: 1px solid #ccc; }
    .input-area button { padding: 10px 20px; border-radius: 20px; border: none; background: #34b7f1; color: white; cursor: pointer; }
  </style>
</head>
<body>
  <h2>💬 AI 聊天室</h2>
  <div id="status">连接中...</div>
  <ul id="messages"></ul>
  <div class="input-area">
    <input id="msg" placeholder="输入消息,按回车发送" autocomplete="off">
    <button onclick="send()">发送</button>
  </div>

  <script>
    const token = "你的JWT令牌";  // 实际使用时从后端获取
    const ws = new WebSocket(`ws://localhost:8000/ws/chat?token=${token}`);

    const messagesEl = document.getElementById("messages");
    const statusEl = document.getElementById("status");

    ws.onopen = () => {
      statusEl.textContent = "🟢 已连接";
      statusEl.style.color = "green";
      // 启动心跳
      setInterval(() => ws.send(JSON.stringify({type: "ping"})), 30000);
    };

    ws.onmessage = (e) => {
      const msg = JSON.parse(e.data);
      if (msg.type === "system") {
        addMessage(msg.content, "system");
      } else {
        addMessage(`${msg.user_name}: ${msg.content}`, msg.type === "private" ? "mine" : "others");
      }
    };

    ws.onclose = () => {
      statusEl.textContent = "🔴 连接已断开";
      statusEl.style.color = "red";
    };

    function addMessage(text, type) {
      const li = document.createElement("li");
      li.textContent = text;
      li.className = type;
      messagesEl.appendChild(li);
      messagesEl.scrollTop = messagesEl.scrollHeight;
    }

    function send() {
      const input = document.getElementById("msg");
      if (!input.value.trim()) return;
      ws.send(JSON.stringify({type: "text", content: input.value}));
      input.value = "";
    }

    document.getElementById("msg").addEventListener("keypress", (e) => {
      if (e.key === "Enter") send();
    });
  </script>
</body>
</html>

5. AI 实时助手(WebSocket + AI)

5.1 AI 对话 WebSocket

# routers/websocket_ai.py
@router.websocket("/ws/ai")
async def ai_chat_websocket(websocket: WebSocket):
    """AI 实时对话 WebSocket"""
    await websocket.accept()
    openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)

    try:
        # 接收初始化消息(包含对话历史)
        init = await websocket.receive_json()
        messages = init.get("history", [])

        while True:
            # 1. 接收用户消息
            user_msg = await websocket.receive_json()
            user_content = user_msg["content"]

            # 2. 添加用户消息到历史
            messages.append({"role": "user", "content": user_content})

            # 3. 流式调用 AI
            stream_resp = await openai_client.chat.completions.create(
                model="gpt-4o-mini",
                messages=messages,
                stream=True,
            )

            # 4. 流式转发 AI 回复
            full_response = ""
            for chunk in stream_resp:
                token = chunk.choices[0].delta.content
                if token:
                    full_response += token
                    # 逐 token 发送给前端
                    await websocket.send_json({
                        "type": "token",
                        "content": token,
                    })

            # 5. 保存 AI 回复到历史
            messages.append({"role": "assistant", "content": full_response})

            # 6. 发送完成信号
            await websocket.send_json({"type": "done"})

    except WebSocketDisconnect:
        print("AI 聊天会话结束")
    except Exception as e:
        await websocket.send_json({"type": "error", "message": str(e)})
        await websocket.close()

6. 生产注意事项

⚠️ WebSocket 生产环境注意事项:

1. Nginx WebSocket 支持(必须配置)
   proxy_http_version 1.1;
   proxy_set_header Upgrade $http_upgrade;
   proxy_set_header Connection "upgrade";

2. 心跳保活
   → 客户端每 30 秒发送一次 ping
   → 服务端定期检测超时未响应的连接

3. Redis Pub/Sub 集群
   → 多 Worker 进程间共享连接状态
   → 使用 redis.asyncio 管理(aioredis)

4. 消息持久化
   → 将聊天记录存入数据库
   → 支持历史消息查询

5. 连接数限制
   → 每个用户一个连接
   → 防止滥用(IP 限流)

7. 小结

# WebSocket 速查

# 服务端
@app.websocket("/ws")
async def chat(websocket: WebSocket):
    await websocket.accept()
    await websocket.send_text("Hello")
    data = await websocket.receive_text()
    await websocket.close()

# 客户端
const ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = (e) => console.log(e.data);
ws.send("Hello");

# 工具库
# fastapi-WebSocket-Manager: github.com/testdrivenai/fastapi-websocket-manager
# websockets: asyncio 原生 WebSocket 库

💡 记住:WebSocket 是持久连接,与无状态的 HTTP 有本质不同。需要额外的连接管理、鉴权传递(通常用 URL 参数或首次消息携带 token)和心跳机制。


🔗 扩展阅读