Claude Code WebSocket 实时实现:认证、重连、心跳与房间路由
用Claude Code构建可靠的WebSocket实时功能,覆盖认证、重连、心跳、房间路由、消息验证与评审。
当页面必须在服务器状态变化的瞬间更新时,WebSocket 很有用。例如 Claude Code 任务进入“测试中”、评审室出现新评论、团队成员加入同一个 room,或运维告警需要立即显示。Claude Code 可以很快生成初版,但真正能上线的实时功能,不能只停留在 onmessage 能跑。
容易出问题的地方通常是认证握手、重连风暴、手机网络断线、非法 JSON、慢消费者、room 越权以及缺少审计日志。本文给出一套可以复制运行的 Node.js ws 示例,并配套浏览器客户端、消息 schema 验证、heartbeat、测试客户端和 Claude Code 评审 prompt。
基础 API 建议同时阅读 MDN WebSocket、连接状态的 readyState 以及关闭连接的 close。Claude Code 的工作流可以参考 Claude Code common workflows。
什么时候该用 WebSocket
WebSocket 是一条长期存在的双向通道。它不替代 HTTP API、数据库写入、队列、计费流程或审计日志。它适合低延迟协作,关键业务动作仍然要落到可靠的服务端路径。
| 方案 | 适合场景 | 不适合场景 | 给 Claude Code 的指示 |
|---|---|---|---|
| WebSocket | 聊天、协作界面、任务进度、room 广播 | 偶尔出现的通知、需要搜索的历史记录 | 包含认证、重连、heartbeat、backpressure |
| Server-Sent Events | 服务器到浏览器的一方向通知 | 浏览器频繁向服务器发送操作 | 设计事件 ID 和事件类型 |
| 轮询 | 管理后台每隔几十秒刷新状态 | 低延迟体验 | 加缓存和限流 |
| HTTP API | 保存、计费、审计、管理操作 | 输入状态、在线状态等高频事件 | 复用同一套授权检查 |
实际用例至少有三类:第一,长时间 Claude Code 任务的实时进度,例如“分析中”“测试中”“生成 diff”。第二,代码评审 room 中的评论、已读状态和成员在线状态。第三,运维看板按客户或团队广播告警。培训场景也可以用它把每位学员的练习状态实时推到讲师屏幕。
架构图
flowchart LR
Browser["Browser client"] -->|"auth message + join room"| Server["Node ws server"]
Server --> Validator["JSON schema validator"]
Server --> Rooms["room registry"]
Server --> Audit["audit log / database"]
Rooms -->|"broadcast"| Browser
Server -->|"ping"| Browser
Browser -->|"auto pong / reconnect"| Server
Claude["Claude Code review loop"] --> Server
backpressure 指接收方处理不过来,发送缓冲不断变大的状态。MDN 提醒,经典 WebSocket API 不会自动处理 backpressure,所以实现里必须检查 bufferedAmount,并对高风险连接降速或关闭。
先把失败模式写进 prompt
不要只说“做一个实时聊天”。一开始就把约束告诉 Claude Code。
Create a Node.js ws realtime WebSocket foundation.
Requirements:
- Browser sends an auth message after connect; join/chat are forbidden before auth
- Clients join by roomId and broadcasts only go to the same room
- Validate JSON messages by type and close invalid messages with 1008
- Implement heartbeat with server ping and pong tracking
- Close slow consumers with 1013 when bufferedAmount is too high
- Browser client uses exponential backoff with jitter
- Do not put secrets in URL query strings
- Do not treat WebSocket delivery as an audit log replacement
可运行的 Node ws 服务器
本地运行步骤如下:
npm init -y
npm install ws
node server.mjs
创建 server.mjs。生产环境不要使用固定 DEV_TOKEN,应替换成短期票据、session cookie,或 API gateway 已经验证过的身份。
import { createServer } from "node:http";
import { randomUUID } from "node:crypto";
import { WebSocket, WebSocketServer } from "ws";
const PORT = Number(process.env.PORT || 8080);
const DEV_TOKEN = process.env.DEV_TOKEN || "dev-token";
const AUTH_TIMEOUT_MS = 5000;
const HEARTBEAT_INTERVAL_MS = 30000;
const MAX_BUFFERED_BYTES = 1024 * 1024;
const rooms = new Map();
const server = createServer((req, res) => {
res.writeHead(200, { "content-type": "text/plain" });
res.end("WebSocket server is running\n");
});
const wss = new WebSocketServer({ server });
function isRoomId(value) {
return typeof value === "string" && /^[a-zA-Z0-9:_-]{1,64}$/.test(value);
}
function validateClientMessage(value) {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return { ok: false, error: "message must be an object" };
}
if (typeof value.type !== "string") {
return { ok: false, error: "type is required" };
}
if (value.type === "auth") {
if (typeof value.token !== "string" || value.token.length < 8) {
return { ok: false, error: "auth.token is invalid" };
}
return { ok: true, value: { type: "auth", token: value.token } };
}
if (value.type === "join") {
if (!isRoomId(value.roomId)) return { ok: false, error: "roomId is invalid" };
return { ok: true, value: { type: "join", roomId: value.roomId } };
}
if (value.type === "chat") {
if (!isRoomId(value.roomId)) return { ok: false, error: "roomId is invalid" };
if (typeof value.text !== "string" || value.text.length < 1 || value.text.length > 1000) {
return { ok: false, error: "text must be 1-1000 chars" };
}
return {
ok: true,
value: {
type: "chat",
roomId: value.roomId,
requestId: typeof value.requestId === "string" ? value.requestId.slice(0, 100) : randomUUID(),
text: value.text,
},
};
}
if (value.type === "ping") return { ok: true, value: { type: "ping" } };
return { ok: false, error: `unsupported type: ${value.type}` };
}
function decodeMessage(raw) {
if (raw.length > 8192) return { ok: false, error: "message too large" };
try {
return validateClientMessage(JSON.parse(raw.toString("utf8")));
} catch {
return { ok: false, error: "invalid JSON" };
}
}
function sendJson(ws, payload) {
if (ws.readyState !== WebSocket.OPEN) return;
if (ws.bufferedAmount > MAX_BUFFERED_BYTES) {
ws.close(1013, "slow consumer");
return;
}
ws.send(JSON.stringify(payload));
}
function joinRoom(ws, roomId) {
if (!rooms.has(roomId)) rooms.set(roomId, new Set());
rooms.get(roomId).add(ws);
ws.rooms.add(roomId);
}
function leaveAllRooms(ws) {
for (const roomId of ws.rooms) {
const members = rooms.get(roomId);
if (!members) continue;
members.delete(ws);
if (members.size === 0) rooms.delete(roomId);
}
ws.rooms.clear();
}
function broadcast(roomId, payload) {
const members = rooms.get(roomId);
if (!members) return;
for (const client of members) sendJson(client, payload);
}
wss.on("connection", (ws) => {
ws.id = randomUUID();
ws.user = null;
ws.rooms = new Set();
ws.isAlive = true;
const authTimer = setTimeout(() => {
if (!ws.user) ws.close(4401, "auth required");
}, AUTH_TIMEOUT_MS);
ws.on("pong", () => {
ws.isAlive = true;
});
ws.on("message", (raw) => {
const decoded = decodeMessage(raw);
if (!decoded.ok) {
ws.close(1008, decoded.error);
return;
}
const msg = decoded.value;
if (!ws.user) {
if (msg.type !== "auth" || msg.token !== DEV_TOKEN) {
ws.close(4403, "auth failed");
return;
}
ws.user = { id: `user-${ws.id.slice(0, 8)}` };
clearTimeout(authTimer);
sendJson(ws, { type: "auth_ok", connectionId: ws.id, userId: ws.user.id });
return;
}
if (msg.type === "join") {
joinRoom(ws, msg.roomId);
sendJson(ws, { type: "joined", roomId: msg.roomId });
return;
}
if (msg.type === "chat") {
if (!ws.rooms.has(msg.roomId)) {
ws.close(1008, "join room before chat");
return;
}
const event = {
type: "chat",
id: randomUUID(),
requestId: msg.requestId,
roomId: msg.roomId,
userId: ws.user.id,
text: msg.text,
sentAt: new Date().toISOString(),
};
console.log(JSON.stringify({ audit: true, event }));
broadcast(msg.roomId, event);
return;
}
if (msg.type === "ping") sendJson(ws, { type: "pong", sentAt: new Date().toISOString() });
});
ws.on("close", () => {
clearTimeout(authTimer);
leaveAllRooms(ws);
});
});
setInterval(() => {
for (const ws of wss.clients) {
if (ws.isAlive === false) {
leaveAllRooms(ws);
ws.terminate();
continue;
}
ws.isAlive = false;
ws.ping();
}
}, HEARTBEAT_INTERVAL_MS).unref();
server.listen(PORT, () => {
console.log(`WebSocket server listening on ws://localhost:${PORT}`);
});
浏览器客户端:重连与 backoff
浏览器的 WebSocket 构造函数不能附加任意 Authorization header。浏览器应用应先通过 HTTPS 获取短期票据,再在 wss 连接后的第一条 auth 消息里发送。不要把 secret 放进 query string,因为代理、日志和分析工具都可能记录它。
<!doctype html>
<html lang="zh-CN">
<body>
<form id="form">
<input id="text" autocomplete="off" placeholder="message" />
<button>send</button>
</form>
<pre id="log"></pre>
<script>
const roomId = "room:demo";
const token = "dev-token";
const log = document.querySelector("#log");
const form = document.querySelector("#form");
const input = document.querySelector("#text");
let socket;
let attempt = 0;
let stopped = false;
function write(line) {
log.textContent += `${line}\n`;
}
function reconnectDelay() {
const base = Math.min(1000 * 2 ** attempt, 10000);
attempt += 1;
return base + Math.floor(Math.random() * 300);
}
function send(payload) {
if (!socket || socket.readyState !== WebSocket.OPEN) return false;
if (socket.bufferedAmount > 512 * 1024) {
write("send skipped: browser buffer is high");
return false;
}
socket.send(JSON.stringify(payload));
return true;
}
function connect() {
socket = new WebSocket("ws://localhost:8080");
write("connecting");
socket.addEventListener("open", () => {
attempt = 0;
send({ type: "auth", token });
send({ type: "join", roomId });
});
socket.addEventListener("message", (event) => {
const msg = JSON.parse(event.data);
write(`${msg.type}: ${JSON.stringify(msg)}`);
});
socket.addEventListener("close", (event) => {
write(`closed code=${event.code} reason=${event.reason}`);
if (!stopped) window.setTimeout(connect, reconnectDelay());
});
socket.addEventListener("error", () => {
write("socket error");
});
}
form.addEventListener("submit", (event) => {
event.preventDefault();
const text = input.value.trim();
if (!text) return;
send({ type: "chat", roomId, requestId: crypto.randomUUID(), text });
input.value = "";
});
window.addEventListener("beforeunload", () => {
stopped = true;
if (socket && socket.readyState === WebSocket.OPEN) socket.close(1000, "page unload");
});
connect();
</script>
</body>
</html>
单独评审消息 schema
先让 Claude Code 评审 schema,再评审整个服务器。schema 模糊时,巨大 payload、未知 type 和 room 伪造最容易混进去。
export function validateClientMessage(value) {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return { ok: false, error: "message must be an object" };
}
if (value.type === "join" && /^[a-zA-Z0-9:_-]{1,64}$/.test(value.roomId)) {
return { ok: true, value: { type: "join", roomId: value.roomId } };
}
if (value.type === "chat" && /^[a-zA-Z0-9:_-]{1,64}$/.test(value.roomId)) {
const text = typeof value.text === "string" ? value.text.trim() : "";
if (text.length >= 1 && text.length <= 1000) {
return { ok: true, value: { type: "chat", roomId: value.roomId, text } };
}
}
return { ok: false, error: "invalid realtime message" };
}
Heartbeat 与慢消费者
heartbeat 用来确认连接是否还活着。ws 服务器发送 ping,浏览器会自动返回协议级 pong。
setInterval(() => {
for (const ws of wss.clients) {
if (ws.isAlive === false) {
ws.terminate();
continue;
}
ws.isAlive = false;
ws.ping();
}
}, 30000);
慢消费者是指连接还在,但接收方处理不过来。每次发送前检查 bufferedAmount,过高时用 1013 关闭,让客户端稍后重连。
测试客户端
不用再开一个浏览器,也能模拟第二个参与者。
import { WebSocket } from "ws";
const ws = new WebSocket("ws://localhost:8080");
const roomId = "room:demo";
ws.on("open", () => {
ws.send(JSON.stringify({ type: "auth", token: "dev-token" }));
ws.send(JSON.stringify({ type: "join", roomId }));
setInterval(() => {
ws.send(JSON.stringify({
type: "chat",
roomId,
requestId: crypto.randomUUID(),
text: `hello ${new Date().toISOString()}`,
}));
}, 3000);
});
ws.on("message", (data) => {
console.log(data.toString());
});
运行 node test-client.mjs,然后重启服务器,确认浏览器会按 backoff 重连。
常见坑
- 把 secret 放在
ws://example.com?token=...,导致日志和截图泄露。 - 只做
JSON.parse,不验证 type、大小、room 和文本长度。 - 不检查
readyState就调用send。 - 没有 heartbeat,导致手机断线后死连接长期存在。
- 把 WebSocket 广播当作审计日志。重要事件仍要写入持久化存储。
- 只让 Claude Code “做个聊天”,没有要求认证、重连、backpressure、close code 和测试。
Claude Code 评审 prompt
Review this WebSocket implementation before release.
Scope:
- server.mjs
- browser client
- message schema validator
Check:
1. Can a client join/chat before auth?
2. Can secrets leak into URLs, logs, or error messages?
3. Can a client spoof roomId and send to another room?
4. Can reconnect/backoff create a connection storm?
5. Does heartbeat remove dead connections?
6. Are slow consumers handled with bufferedAmount?
7. Is there an audit log outside WebSocket delivery?
Return:
- P0/P1/P2 findings first
- Reproduction steps
- Minimal patch
- Missing tests
相关主题可以继续阅读 Claude Code 安全最佳实践、代码评审清单 和 Webhook 实现指南。
培训、模板与咨询
本文代码足够做本地 PoC。团队落地时,还需要统一 CLAUDE.md、评审规则、close code 策略、审计存储和回滚流程。ClaudeCodeLab 提供 Claude Code 培训与咨询 以及 模板和产品指南,适合把实时自动化真正纳入团队工作流。
总结
WebSocket 很适合 Claude Code 进度流、评审 room、协作工具和运维告警。但能连上不代表能安全运行。请把认证握手、room routing、消息验证、重连、heartbeat、backpressure 和审计日志作为同一个设计处理。
把本文实现放到本地试运行后,auth 之前的 chat 会被拒绝,服务器重启后浏览器能按指数 backoff 重连,room 广播也保持在同一个 room 内。仍然需要按团队调整的是运维策略,例如缓冲阈值、close code 文案和审计事件的存储位置。Masa 的实际体会是,让 Claude Code 审查失败模式,比只让它生成正常路径更能提升质量。
免费 PDF: Claude Code 速查表
输入邮箱即可获取一页 PDF,整理常用命令、审查习惯和安全工作流。
我们会妥善保护你的信息,不发送垃圾邮件。
把 Claude Code 变成真正能带来结果的工作流
先领取中文说明的免费 PDF,再进入英文商品页选择合适的教材。如果你需要团队落地、流程设计或内容变现支持,也可以直接咨询。
关于作者
Masa
专注 Claude Code 实务流程、团队导入和内容转化的工程师。
相关文章
从Obsidian到CLAUDE.md的Claude Code流程:不再反复解释上下文
把 Obsidian 工作笔记整理成 CLAUDE.md 运行说明,让 Claude Code 每次都带着正确上下文开始。
Claude Code 收入 CTA 路由:从文章分流到 PDF、Gumroad 与咨询
用 Claude Code 按读者意图把文章流量分到免费 PDF、Gumroad 教材或咨询入口。
Claude Code 团队交接规则: 把审查证据、权限、回滚和收入路径一起交付
面向团队的 Claude Code 交接格式: 证据、权限、回滚、免费 PDF、Gumroad 与咨询路径都要可审查。