Advanced (更新: 2026/6/2)

用 Claude Code 构建队列系统:异步处理实战指南

从生产者、重试、DLQ到监控,讲清Claude Code队列系统的实战设计。

用 Claude Code 构建队列系统:异步处理实战指南

用 Claude Code 开发 Web 服务时,很容易把所有逻辑都塞进一个请求里:表单提交后立刻发邮件,图片上传后马上压缩,支付回调里同时更新订单、发票、CRM 和通知。演示环境里这样看起来很快,但生产环境会遇到外部 API 超时、用户重复点击、部署重启、网络抖动、服务商限流等问题。

队列系统的作用,是把“现在不适合在请求里完成,但必须可靠完成”的工作先放进队列,再由独立 worker 慢慢处理。Claude Code 可以帮你生成代码,但提示词不能只写“做一个队列”。你需要明确 producer(生产者,负责把任务放入队列)、consumer(消费者,负责取出任务并执行)、message payload(消息载荷,也就是 worker 需要读取的数据)、visibility timeout(可见性超时,任务被某个 worker 领取后的临时隐藏时间)、retry(重试)、dead-letter queue/DLQ(死信队列,反复失败任务的隔离区)、idempotency(幂等性,同一任务执行两次也不会产生两次业务结果)、backpressure(背压,系统忙时限制入口流量)和 monitoring(监控)。

本文的示例全部是无依赖 Node.js 脚本,不需要 Redis、AWS 或 RabbitMQ。你可以直接复制运行,先理解队列在失败场景下应该如何工作,再去选择 SQS、RabbitMQ 或 BullMQ。

队列系统全貌

队列不是单纯的“后台执行”。它同时承担解耦、限流、失败隔离、重试控制和可观测性的职责。API 只负责接收请求和记录业务事实,worker 负责执行慢任务,DLQ 负责保存不能继续自动重试的任务。

flowchart LR
  A["Producer<br/>API, cron, webhook"] --> B["Queue<br/>message payload"]
  B --> C["Consumer<br/>worker process"]
  C --> D["External service<br/>mail, image, billing"]
  C -- "retryable failure" --> B
  C -- "poison message" --> E["DLQ<br/>manual review"]
  C --> F["Metrics<br/>logs and alerts"]
概念通俗解释设计时要决定
Producer把任务放入队列的 API、定时任务或 webhookpayload 格式、校验、优先级、去重键
Consumer从队列取任务并执行的 worker并发数、超时、失败处理
Message payloadworker 执行任务需要的数据ID、任务类型、schema version,不能放 secrets
Visibility timeoutworker 处理期间,任务对其他 worker 不可见的时间比 p95 处理时间略长
Retry对临时失败再次执行最大次数、backoff、jitter、失败原因
DLQ不应继续自动重试的失败任务谁负责看、何时告警、如何重新投递
Idempotency重复任务只产生一次业务结果唯一键、处理记录表、服务商幂等键
Backpressureworker 来不及时限制入口concurrency、rate limit、队列深度阈值
Monitoring判断队列是否健康的证据队列深度、最旧任务年龄、失败率、DLQ 数

把这张表交给 Claude Code,比只说“用 BullMQ 做队列”更有效。它能让生成结果覆盖失败路径,而不是只有 happy path。

常见用例

第一个用例是邮件发送队列。欢迎邮件、密码重置、账单失败提醒、咨询回复都不应该阻塞用户请求。相关实现可以继续看邮件自动化SendGrid 邮件发送。payload 里应该放 deliveryIdtemplateIduserId,不要放 SendGrid API key、完整邮件正文或访问令牌。

第二个用例是图片和视频处理。上传后生成缩略图、转 WebP、扫描病毒、生成字幕、剪预览片段,都可能消耗 CPU 或花很久。队列可以让页面先返回“已接收”,再用 worker 控制并发处理。这里最大的坑是无界并发:上传一多,CPU、内存和磁盘 IO 会一起被打爆。

第三个用例是账单重试。支付服务商、银行卡网络、发票系统都会出现临时失败。重试队列可以恢复这种失败,但必须是有限重试。无限重试可能导致重复扣款、触发服务商限流,甚至掩盖真正的权限或 schema 问题。

第四个用例是线索补全和报表生成。咨询表单提交后,系统可以异步补全公司信息、写入 CRM、生成销售报表、通知 Slack。事件整体设计可以参考事件驱动架构,排查和告警可以结合日志与监控,payload 安全则要结合安全最佳实践一起设计。

示例1:无依赖内存队列

这个脚本演示 producer、consumer、message payload、visibility timeout 和 backpressure。保存为 queue-basic-demo.mjs 后运行 node queue-basic-demo.mjs。它不是生产级队列,因为数据只存在内存里,但非常适合理解基本生命周期。

// queue-basic-demo.mjs
let nextJobId = 1;

class InMemoryQueue {
  constructor({ visibilityTimeoutMs = 800, maxInFlight = 2 } = {}) {
    this.visibilityTimeoutMs = visibilityTimeoutMs;
    this.maxInFlight = maxInFlight;
    this.ready = [];
    this.inFlight = new Map();
  }

  enqueue(type, payload) {
    const job = {
      id: `job-${nextJobId++}`,
      type,
      payload,
      attempts: 0,
      visibleAt: 0,
      lockedBy: null,
    };
    this.ready.push(job);
    return job.id;
  }

  receive(workerId) {
    this.requeueExpired();

    if (this.inFlight.size >= this.maxInFlight) {
      return null;
    }

    const job = this.ready.shift();
    if (!job) return null;

    job.attempts += 1;
    job.lockedBy = workerId;
    job.visibleAt = Date.now() + this.visibilityTimeoutMs;
    this.inFlight.set(job.id, job);

    return {
      id: job.id,
      type: job.type,
      payload: job.payload,
      attempts: job.attempts,
    };
  }

  ack(jobId) {
    this.inFlight.delete(jobId);
  }

  requeueExpired(now = Date.now()) {
    for (const [jobId, job] of this.inFlight.entries()) {
      if (job.visibleAt <= now) {
        this.inFlight.delete(jobId);
        job.lockedBy = null;
        this.ready.push(job);
      }
    }
  }

  stats() {
    this.requeueExpired();
    return {
      ready: this.ready.length,
      inFlight: this.inFlight.size,
    };
  }
}

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

function produce(queue) {
  queue.enqueue("email.send", {
    deliveryId: "mail-1001",
    templateId: "welcome",
    userId: "user-42",
  });
  queue.enqueue("image.resize", {
    assetId: "asset-9001",
    sizes: [320, 768, 1280],
  });
  queue.enqueue("report.generate", {
    reportId: "weekly-2026-06-02",
    accountId: "acct-7",
  });
}

async function consume(queue, workerId) {
  for (let step = 0; step < 8; step += 1) {
    const job = queue.receive(workerId);

    if (!job) {
      console.log(`${workerId}: no job or backpressure`, queue.stats());
      await sleep(120);
      continue;
    }

    console.log(`${workerId}: started ${job.id}`, job.payload);
    await sleep(job.type === "image.resize" ? 300 : 90);
    queue.ack(job.id);
    console.log(`${workerId}: acked ${job.id}`, queue.stats());
  }
}

async function main() {
  const queue = new InMemoryQueue({
    visibilityTimeoutMs: 500,
    maxInFlight: 2,
  });

  produce(queue);
  await Promise.all([consume(queue, "worker-a"), consume(queue, "worker-b")]);
  console.log("final stats", queue.stats());
}

void main();

生产环境中,ready 数组会被 SQS、RabbitMQ、Redis 等持久化服务替代。但状态模型不变:任务要么等待中,要么处理中,要么被确认完成,要么因为可见性超时而回到队列。

示例2:worker 幂等性保护

多数队列保证的是 at-least-once delivery,也就是至少投递一次,而不是业务上“只执行一次”。如果 worker 没有幂等性保护,重复投递会带来双重邮件、双重扣款、重复积分或重复 CRM 记录。

// idempotent-worker-demo.mjs
const idempotencyStore = new Map();
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

async function withIdempotency(key, work) {
  const current = idempotencyStore.get(key);

  if (current?.status === "done") {
    return { skipped: true, result: current.result };
  }

  if (current?.status === "processing") {
    return { skipped: true, reason: "already processing" };
  }

  idempotencyStore.set(key, { status: "processing" });

  try {
    const result = await work();
    idempotencyStore.set(key, { status: "done", result });
    return { skipped: false, result };
  } catch (error) {
    idempotencyStore.delete(key);
    throw error;
  }
}

async function fakeSendEmail(payload) {
  await sleep(50);
  return {
    providerMessageId: `sg_${payload.deliveryId}`,
    sentToUserId: payload.userId,
  };
}

async function handleEmailJob(job) {
  const key = job.payload.idempotencyKey;
  if (!key) throw new Error("missing idempotencyKey");

  return withIdempotency(key, () => fakeSendEmail(job.payload));
}

async function main() {
  const original = {
    id: "job-1",
    payload: {
      idempotencyKey: "email:welcome:user-42",
      deliveryId: "mail-1001",
      userId: "user-42",
    },
  };

  console.log(await handleEmailJob(original));
  console.log(await handleEmailJob({ ...original, id: "job-1-redelivery" }));
}

void main();

真实系统里不要用 Map 保存幂等状态,而要用数据库唯一索引、Redis SETNX 或支付服务商提供的 idempotency key。提示 Claude Code 时,要明确“外部副作用成功后才标记完成”“失败时释放处理锁”“payload 不放 secrets”。

示例3:重试和 DLQ

重试适合临时网络错误,不适合无效 payload、已删除用户、权限错误或错误的 provider 配置。poison message 指的是“自动重试也不会成功”的任务。如果它一直回到主队列,worker 会浪费资源,真正的问题也会被淹没。

// retry-dlq-demo.mjs
let nextRetryJobId = 1;

class RetryQueue {
  constructor({ maxAttempts = 3 } = {}) {
    this.maxAttempts = maxAttempts;
    this.ready = [];
    this.delayed = [];
    this.dead = [];
    this.completed = [];
  }

  enqueue(payload) {
    this.ready.push({
      id: `retry-job-${nextRetryJobId++}`,
      payload,
      attempts: 0,
      runAt: Date.now(),
      lastError: null,
    });
  }

  moveReadyJobs(now = Date.now()) {
    const stillDelayed = [];
    for (const job of this.delayed) {
      if (job.runAt <= now) {
        this.ready.push(job);
      } else {
        stillDelayed.push(job);
      }
    }
    this.delayed = stillDelayed;
  }

  retryOrDeadLetter(job, error) {
    job.lastError = error.message;

    if (job.attempts >= this.maxAttempts) {
      this.dead.push(job);
      return;
    }

    const delayMs = 50 * 2 ** (job.attempts - 1);
    job.runAt = Date.now() + delayMs;
    this.delayed.push(job);
  }

  async drain(handler) {
    let idleRounds = 0;

    while (this.ready.length > 0 || this.delayed.length > 0) {
      this.moveReadyJobs();
      const job = this.ready.shift();

      if (!job) {
        idleRounds += 1;
        if (idleRounds > 100) throw new Error("drain timeout");
        await sleep(20);
        continue;
      }

      idleRounds = 0;
      job.attempts += 1;

      try {
        const result = await handler(job);
        this.completed.push({ id: job.id, result });
      } catch (error) {
        this.retryOrDeadLetter(job, error);
      }
    }

    return {
      completed: this.completed.length,
      dead: this.dead.length,
    };
  }
}

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

async function handler(job) {
  if (job.payload.kind === "poison") {
    throw new Error("invalid payload schema");
  }

  if (job.payload.kind === "flaky" && job.attempts < 2) {
    throw new Error("temporary provider timeout");
  }

  return `processed ${job.payload.kind}`;
}

async function main() {
  const queue = new RetryQueue({ maxAttempts: 3 });
  queue.enqueue({ kind: "normal" });
  queue.enqueue({ kind: "flaky" });
  queue.enqueue({ kind: "poison" });

  console.log(await queue.drain(handler));
  console.log(
    "dead letters",
    queue.dead.map((job) => ({
      id: job.id,
      attempts: job.attempts,
      lastError: job.lastError,
      payload: job.payload,
    }))
  );
}

void main();

DLQ 不是垃圾箱,而是需要有人负责的运维入口。必须记录失败原因、告警、负责人、修复方式和重新投递条件,否则 DLQ 只是安静的数据丢失。

运维检查清单

  • payload 包含 jobIdtypeschemaVersion、业务 ID 和 idempotency key。
  • payload 不包含 API key、OAuth token、银行卡信息、完整邮件正文或大量个人信息。
  • producer 在入队前校验 payload,避免坏任务进入系统。
  • visibility timeout 比 p95 处理时间略长;过长任务要拆分或上报进度。
  • retry 次数、backoff、jitter、DLQ 条件在上线前写清楚。
  • consumer concurrency 根据数据库连接数、外部 API 限流、CPU 和内存决定。
  • 监控队列深度、最旧任务年龄、active 数、失败率、DLQ 数、p95 处理时间。
  • DLQ 的查看、重放、删除、客户沟通都写进 runbook。
  • 邮件、扣费、积分、CRM 写入都按重复投递来设计。
  • Claude Code 的 review 要检查失败路径,而不只是成功路径。

visibility timeout 特别容易设错。太短会让同一任务在第一个 worker 还没完成时被第二个 worker 重新领取,太长会让宕机 worker 的任务长时间不可见。先测真实 p95,再给出余量,是最实际的做法。

给 Claude Code 的提示词模式

请把失败契约也写进提示词,而不是只写库名:

给这个仓库添加邮件发送队列。API 先保存请求,再只把 deliveryIdtemplateId 入队。worker 必须用 idempotency key 防止重复发送,对临时 provider 错误最多重试 3 次并使用指数 backoff,连续失败后写入 DLQ 表。payload 不得包含 API key、邮件正文或个人信息。请暴露队列深度、最旧任务年龄、失败率、DLQ 数,并添加重复投递、poison message、visibility timeout 的测试。

这样的提示词能让 Claude Code 输出更接近可 review 的生产代码,而不是只能演示的最小样例。

官方文档和本番选型

如果系统主要运行在 AWS 上,先看 Amazon SQS Developer Guide。如果需要复杂路由、exchange、pub/sub 或自管消息拓扑,可以看 RabbitMQ documentation。如果 Node.js 服务已经使用 Redis,并且想快速实现延迟任务、重复任务和 worker 管理,可以从 BullMQ documentation 开始。

不要先选库再补设计。payload、幂等性、retry、DLQ、监控、权限和成本,才是决定技术方案的核心。

常见失败

重复处理是第一类问题。队列通常保证至少投递一次,而不是业务效果只发生一次。网络断开、ack 失败、worker 重启、visibility timeout 过短都会导致重复投递。

poison message 是第二类问题。schema 错误、缺少用户、权限错误、旧版本 payload 都不会因为重试而变好。要早校验、记录原因、进入 DLQ,并在修复后受控重放。

无限重试是第三类问题。服务商故障时立刻大量重试,会让恢复更慢。有限次数、指数 backoff、jitter 和 producer 侧 backpressure 是基本要求。

payload 里放 secrets 是第四类问题。队列内容会进入日志、DLQ、管理后台和排障工具。payload 应该只放引用 ID,worker 再从有权限的数据源读取敏感数据。

培训与咨询

队列系统的难点不在代码行数,而在失败和运维设计。ClaudeCodeLab 可以把 Claude Code 提示词、CLAUDE.md 规则、payload schema、DLQ runbook、监控指标和 CI review 整理成团队流程。团队导入可以从Claude Code 培训与咨询开始;个人开发者也可以把本文清单贴到 PR 模板里,每次改队列都逐项确认。

总结

任务队列是生产基础设施,不只是“把慢任务放到后台”。它要控制慢任务、隔离失败、防止重复业务结果、限制并发,并留下可调查证据。让 Claude Code 实现队列时,请从第一条提示词就写清 producer、consumer、payload、visibility timeout、retry、DLQ、idempotency、backpressure 和 monitoring。

Masa 的亲测结果:我在本地直接运行了三个无依赖 Node.js 示例,确认了基础入队和消费、重复投递保护、poison message 进入 DLQ 的行为。幂等性示例尤其适合作为提示词素材,因为同一个邮件任务被再次投递时,日志会显示第二次复用了已保存结果,而不是再次发送邮件。

#Claude Code #任务队列 #异步处理 #BullMQ #Redis
免费

免费 PDF: Claude Code 速查表

输入邮箱即可获取一页 PDF,整理常用命令、审查习惯和安全工作流。

我们会妥善保护你的信息,不发送垃圾邮件。

把 Claude Code 变成真正能带来结果的工作流

先领取中文说明的免费 PDF,再进入英文商品页选择合适的教材。如果你需要团队落地、流程设计或内容变现支持,也可以直接咨询。

Masa

关于作者

Masa

专注 Claude Code 实务流程、团队导入和内容转化的工程师。