用 Claude Code 构建队列系统:异步处理实战指南
从生产者、重试、DLQ到监控,讲清Claude Code队列系统的实战设计。
用 Claude Code 开发 Web 服务时,很容易把所有逻辑都塞进一个请求里:表单提交后立刻发邮件,图片上传后马上压缩,支付回调里同时更新订单、发票、CRM 和通知。演示环境里这样看起来很快,但生产环境会遇到外部 API 超时、用户重复点击、部署重启、网络抖动、服务商限流等问题。
队列系统的作用,是把“现在不适合在请求里完成,但必须可靠完成”的工作先放进队列,再由独立 worker 慢慢处理。Claude Code 可以帮你生成代码,但提示词不能只写“做一个队列”。你需要明确 producer(生产者,负责把任务放入队列)、consumer(消费者,负责取出任务并执行)、message payload(消息载荷,也就是 worker 需要读取的数据)、visibility timeout(可见性超时,任务被某个 worker 领取后的临时隐藏时间)、retry(重试)、dead-letter queue/DLQ(死信队列,反复失败任务的隔离区)、idempotency(幂等性,同一任务执行两次也不会产生两次业务结果)、backpressure(背压,系统忙时限制入口流量)和 monitoring(监控)。
本文的示例全部是无依赖 Node.js 脚本,不需要 Redis、AWS 或 RabbitMQ。你可以直接复制运行,先理解队列在失败场景下应该如何工作,再去选择 SQS、RabbitMQ 或 BullMQ。
队列系统全貌
队列不是单纯的“后台执行”。它同时承担解耦、限流、失败隔离、重试控制和可观测性的职责。API 只负责接收请求和记录业务事实,worker 负责执行慢任务,DLQ 负责保存不能继续自动重试的任务。
flowchart LR
A["Producer<br/>API, cron, webhook"] --> B["Queue<br/>message payload"]
B --> C["Consumer<br/>worker process"]
C --> D["External service<br/>mail, image, billing"]
C -- "retryable failure" --> B
C -- "poison message" --> E["DLQ<br/>manual review"]
C --> F["Metrics<br/>logs and alerts"]
| 概念 | 通俗解释 | 设计时要决定 |
|---|---|---|
| Producer | 把任务放入队列的 API、定时任务或 webhook | payload 格式、校验、优先级、去重键 |
| Consumer | 从队列取任务并执行的 worker | 并发数、超时、失败处理 |
| Message payload | worker 执行任务需要的数据 | ID、任务类型、schema version,不能放 secrets |
| Visibility timeout | worker 处理期间,任务对其他 worker 不可见的时间 | 比 p95 处理时间略长 |
| Retry | 对临时失败再次执行 | 最大次数、backoff、jitter、失败原因 |
| DLQ | 不应继续自动重试的失败任务 | 谁负责看、何时告警、如何重新投递 |
| Idempotency | 重复任务只产生一次业务结果 | 唯一键、处理记录表、服务商幂等键 |
| Backpressure | worker 来不及时限制入口 | concurrency、rate limit、队列深度阈值 |
| Monitoring | 判断队列是否健康的证据 | 队列深度、最旧任务年龄、失败率、DLQ 数 |
把这张表交给 Claude Code,比只说“用 BullMQ 做队列”更有效。它能让生成结果覆盖失败路径,而不是只有 happy path。
常见用例
第一个用例是邮件发送队列。欢迎邮件、密码重置、账单失败提醒、咨询回复都不应该阻塞用户请求。相关实现可以继续看邮件自动化和SendGrid 邮件发送。payload 里应该放 deliveryId、templateId、userId,不要放 SendGrid API key、完整邮件正文或访问令牌。
第二个用例是图片和视频处理。上传后生成缩略图、转 WebP、扫描病毒、生成字幕、剪预览片段,都可能消耗 CPU 或花很久。队列可以让页面先返回“已接收”,再用 worker 控制并发处理。这里最大的坑是无界并发:上传一多,CPU、内存和磁盘 IO 会一起被打爆。
第三个用例是账单重试。支付服务商、银行卡网络、发票系统都会出现临时失败。重试队列可以恢复这种失败,但必须是有限重试。无限重试可能导致重复扣款、触发服务商限流,甚至掩盖真正的权限或 schema 问题。
第四个用例是线索补全和报表生成。咨询表单提交后,系统可以异步补全公司信息、写入 CRM、生成销售报表、通知 Slack。事件整体设计可以参考事件驱动架构,排查和告警可以结合日志与监控,payload 安全则要结合安全最佳实践一起设计。
示例1:无依赖内存队列
这个脚本演示 producer、consumer、message payload、visibility timeout 和 backpressure。保存为 queue-basic-demo.mjs 后运行 node queue-basic-demo.mjs。它不是生产级队列,因为数据只存在内存里,但非常适合理解基本生命周期。
// queue-basic-demo.mjs
let nextJobId = 1;
class InMemoryQueue {
constructor({ visibilityTimeoutMs = 800, maxInFlight = 2 } = {}) {
this.visibilityTimeoutMs = visibilityTimeoutMs;
this.maxInFlight = maxInFlight;
this.ready = [];
this.inFlight = new Map();
}
enqueue(type, payload) {
const job = {
id: `job-${nextJobId++}`,
type,
payload,
attempts: 0,
visibleAt: 0,
lockedBy: null,
};
this.ready.push(job);
return job.id;
}
receive(workerId) {
this.requeueExpired();
if (this.inFlight.size >= this.maxInFlight) {
return null;
}
const job = this.ready.shift();
if (!job) return null;
job.attempts += 1;
job.lockedBy = workerId;
job.visibleAt = Date.now() + this.visibilityTimeoutMs;
this.inFlight.set(job.id, job);
return {
id: job.id,
type: job.type,
payload: job.payload,
attempts: job.attempts,
};
}
ack(jobId) {
this.inFlight.delete(jobId);
}
requeueExpired(now = Date.now()) {
for (const [jobId, job] of this.inFlight.entries()) {
if (job.visibleAt <= now) {
this.inFlight.delete(jobId);
job.lockedBy = null;
this.ready.push(job);
}
}
}
stats() {
this.requeueExpired();
return {
ready: this.ready.length,
inFlight: this.inFlight.size,
};
}
}
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
function produce(queue) {
queue.enqueue("email.send", {
deliveryId: "mail-1001",
templateId: "welcome",
userId: "user-42",
});
queue.enqueue("image.resize", {
assetId: "asset-9001",
sizes: [320, 768, 1280],
});
queue.enqueue("report.generate", {
reportId: "weekly-2026-06-02",
accountId: "acct-7",
});
}
async function consume(queue, workerId) {
for (let step = 0; step < 8; step += 1) {
const job = queue.receive(workerId);
if (!job) {
console.log(`${workerId}: no job or backpressure`, queue.stats());
await sleep(120);
continue;
}
console.log(`${workerId}: started ${job.id}`, job.payload);
await sleep(job.type === "image.resize" ? 300 : 90);
queue.ack(job.id);
console.log(`${workerId}: acked ${job.id}`, queue.stats());
}
}
async function main() {
const queue = new InMemoryQueue({
visibilityTimeoutMs: 500,
maxInFlight: 2,
});
produce(queue);
await Promise.all([consume(queue, "worker-a"), consume(queue, "worker-b")]);
console.log("final stats", queue.stats());
}
void main();
生产环境中,ready 数组会被 SQS、RabbitMQ、Redis 等持久化服务替代。但状态模型不变:任务要么等待中,要么处理中,要么被确认完成,要么因为可见性超时而回到队列。
示例2:worker 幂等性保护
多数队列保证的是 at-least-once delivery,也就是至少投递一次,而不是业务上“只执行一次”。如果 worker 没有幂等性保护,重复投递会带来双重邮件、双重扣款、重复积分或重复 CRM 记录。
// idempotent-worker-demo.mjs
const idempotencyStore = new Map();
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
async function withIdempotency(key, work) {
const current = idempotencyStore.get(key);
if (current?.status === "done") {
return { skipped: true, result: current.result };
}
if (current?.status === "processing") {
return { skipped: true, reason: "already processing" };
}
idempotencyStore.set(key, { status: "processing" });
try {
const result = await work();
idempotencyStore.set(key, { status: "done", result });
return { skipped: false, result };
} catch (error) {
idempotencyStore.delete(key);
throw error;
}
}
async function fakeSendEmail(payload) {
await sleep(50);
return {
providerMessageId: `sg_${payload.deliveryId}`,
sentToUserId: payload.userId,
};
}
async function handleEmailJob(job) {
const key = job.payload.idempotencyKey;
if (!key) throw new Error("missing idempotencyKey");
return withIdempotency(key, () => fakeSendEmail(job.payload));
}
async function main() {
const original = {
id: "job-1",
payload: {
idempotencyKey: "email:welcome:user-42",
deliveryId: "mail-1001",
userId: "user-42",
},
};
console.log(await handleEmailJob(original));
console.log(await handleEmailJob({ ...original, id: "job-1-redelivery" }));
}
void main();
真实系统里不要用 Map 保存幂等状态,而要用数据库唯一索引、Redis SETNX 或支付服务商提供的 idempotency key。提示 Claude Code 时,要明确“外部副作用成功后才标记完成”“失败时释放处理锁”“payload 不放 secrets”。
示例3:重试和 DLQ
重试适合临时网络错误,不适合无效 payload、已删除用户、权限错误或错误的 provider 配置。poison message 指的是“自动重试也不会成功”的任务。如果它一直回到主队列,worker 会浪费资源,真正的问题也会被淹没。
// retry-dlq-demo.mjs
let nextRetryJobId = 1;
class RetryQueue {
constructor({ maxAttempts = 3 } = {}) {
this.maxAttempts = maxAttempts;
this.ready = [];
this.delayed = [];
this.dead = [];
this.completed = [];
}
enqueue(payload) {
this.ready.push({
id: `retry-job-${nextRetryJobId++}`,
payload,
attempts: 0,
runAt: Date.now(),
lastError: null,
});
}
moveReadyJobs(now = Date.now()) {
const stillDelayed = [];
for (const job of this.delayed) {
if (job.runAt <= now) {
this.ready.push(job);
} else {
stillDelayed.push(job);
}
}
this.delayed = stillDelayed;
}
retryOrDeadLetter(job, error) {
job.lastError = error.message;
if (job.attempts >= this.maxAttempts) {
this.dead.push(job);
return;
}
const delayMs = 50 * 2 ** (job.attempts - 1);
job.runAt = Date.now() + delayMs;
this.delayed.push(job);
}
async drain(handler) {
let idleRounds = 0;
while (this.ready.length > 0 || this.delayed.length > 0) {
this.moveReadyJobs();
const job = this.ready.shift();
if (!job) {
idleRounds += 1;
if (idleRounds > 100) throw new Error("drain timeout");
await sleep(20);
continue;
}
idleRounds = 0;
job.attempts += 1;
try {
const result = await handler(job);
this.completed.push({ id: job.id, result });
} catch (error) {
this.retryOrDeadLetter(job, error);
}
}
return {
completed: this.completed.length,
dead: this.dead.length,
};
}
}
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
async function handler(job) {
if (job.payload.kind === "poison") {
throw new Error("invalid payload schema");
}
if (job.payload.kind === "flaky" && job.attempts < 2) {
throw new Error("temporary provider timeout");
}
return `processed ${job.payload.kind}`;
}
async function main() {
const queue = new RetryQueue({ maxAttempts: 3 });
queue.enqueue({ kind: "normal" });
queue.enqueue({ kind: "flaky" });
queue.enqueue({ kind: "poison" });
console.log(await queue.drain(handler));
console.log(
"dead letters",
queue.dead.map((job) => ({
id: job.id,
attempts: job.attempts,
lastError: job.lastError,
payload: job.payload,
}))
);
}
void main();
DLQ 不是垃圾箱,而是需要有人负责的运维入口。必须记录失败原因、告警、负责人、修复方式和重新投递条件,否则 DLQ 只是安静的数据丢失。
运维检查清单
- payload 包含
jobId、type、schemaVersion、业务 ID 和 idempotency key。 - payload 不包含 API key、OAuth token、银行卡信息、完整邮件正文或大量个人信息。
- producer 在入队前校验 payload,避免坏任务进入系统。
- visibility timeout 比 p95 处理时间略长;过长任务要拆分或上报进度。
- retry 次数、backoff、jitter、DLQ 条件在上线前写清楚。
- consumer concurrency 根据数据库连接数、外部 API 限流、CPU 和内存决定。
- 监控队列深度、最旧任务年龄、active 数、失败率、DLQ 数、p95 处理时间。
- DLQ 的查看、重放、删除、客户沟通都写进 runbook。
- 邮件、扣费、积分、CRM 写入都按重复投递来设计。
- Claude Code 的 review 要检查失败路径,而不只是成功路径。
visibility timeout 特别容易设错。太短会让同一任务在第一个 worker 还没完成时被第二个 worker 重新领取,太长会让宕机 worker 的任务长时间不可见。先测真实 p95,再给出余量,是最实际的做法。
给 Claude Code 的提示词模式
请把失败契约也写进提示词,而不是只写库名:
给这个仓库添加邮件发送队列。API 先保存请求,再只把
deliveryId和templateId入队。worker 必须用 idempotency key 防止重复发送,对临时 provider 错误最多重试 3 次并使用指数 backoff,连续失败后写入 DLQ 表。payload 不得包含 API key、邮件正文或个人信息。请暴露队列深度、最旧任务年龄、失败率、DLQ 数,并添加重复投递、poison message、visibility timeout 的测试。
这样的提示词能让 Claude Code 输出更接近可 review 的生产代码,而不是只能演示的最小样例。
官方文档和本番选型
如果系统主要运行在 AWS 上,先看 Amazon SQS Developer Guide。如果需要复杂路由、exchange、pub/sub 或自管消息拓扑,可以看 RabbitMQ documentation。如果 Node.js 服务已经使用 Redis,并且想快速实现延迟任务、重复任务和 worker 管理,可以从 BullMQ documentation 开始。
不要先选库再补设计。payload、幂等性、retry、DLQ、监控、权限和成本,才是决定技术方案的核心。
常见失败
重复处理是第一类问题。队列通常保证至少投递一次,而不是业务效果只发生一次。网络断开、ack 失败、worker 重启、visibility timeout 过短都会导致重复投递。
poison message 是第二类问题。schema 错误、缺少用户、权限错误、旧版本 payload 都不会因为重试而变好。要早校验、记录原因、进入 DLQ,并在修复后受控重放。
无限重试是第三类问题。服务商故障时立刻大量重试,会让恢复更慢。有限次数、指数 backoff、jitter 和 producer 侧 backpressure 是基本要求。
payload 里放 secrets 是第四类问题。队列内容会进入日志、DLQ、管理后台和排障工具。payload 应该只放引用 ID,worker 再从有权限的数据源读取敏感数据。
培训与咨询
队列系统的难点不在代码行数,而在失败和运维设计。ClaudeCodeLab 可以把 Claude Code 提示词、CLAUDE.md 规则、payload schema、DLQ runbook、监控指标和 CI review 整理成团队流程。团队导入可以从Claude Code 培训与咨询开始;个人开发者也可以把本文清单贴到 PR 模板里,每次改队列都逐项确认。
总结
任务队列是生产基础设施,不只是“把慢任务放到后台”。它要控制慢任务、隔离失败、防止重复业务结果、限制并发,并留下可调查证据。让 Claude Code 实现队列时,请从第一条提示词就写清 producer、consumer、payload、visibility timeout、retry、DLQ、idempotency、backpressure 和 monitoring。
Masa 的亲测结果:我在本地直接运行了三个无依赖 Node.js 示例,确认了基础入队和消费、重复投递保护、poison message 进入 DLQ 的行为。幂等性示例尤其适合作为提示词素材,因为同一个邮件任务被再次投递时,日志会显示第二次复用了已保存结果,而不是再次发送邮件。
免费 PDF: Claude Code 速查表
输入邮箱即可获取一页 PDF,整理常用命令、审查习惯和安全工作流。
我们会妥善保护你的信息,不发送垃圾邮件。
把 Claude Code 变成真正能带来结果的工作流
先领取中文说明的免费 PDF,再进入英文商品页选择合适的教材。如果你需要团队落地、流程设计或内容变现支持,也可以直接咨询。
关于作者
Masa
专注 Claude Code 实务流程、团队导入和内容转化的工程师。
相关文章
Claude Code Permission Receipt Pattern:记录权限、证据和回滚方式
Claude Code 权限 receipt:记录允许动作、需要批准的边界、验证命令、回滚说明,以及 Gumroad 和咨询 CTA 检查。
Claude Code/Codex 安全 Agent Harness 实战:权限、验证与回滚
用权限策略、执行计划、验证脚本和回滚日志,为 Claude Code 与 Codex 搭建更安全的 AI Agent 工作流。
Claude Code 子代理实战指南:安全委派并行文章与代码工作
用 Claude Code 子代理安全拆分文章和代码工作:委派规则、提示词模板、失败模式与检查清单。