Advanced (更新: 2026/6/2)

用 Claude Code 构建生产级通知系统

用 Claude Code 设计生产通知系统: DB、API、React、未读、幂等、重试与测试。

用 Claude Code 构建生产级通知系统

通知系统不是放一个铃铛图标就结束。 真正会在生产环境出问题的地方,是重复发送、未读数不一致、邮件轰炸、外部服务失败后无法重试,以及把隐私信息写进通知内容。 本文把 Claude Code 的任务边界写清楚,用可复制的 DB schema、Next.js API、通知服务、React 通知中心和测试,搭出一个可运行的基础版本。

核心原则是: 应用内通知是事实来源,email 和 push 只是后续配送渠道。先写入数据库,再根据用户偏好、重要级别、批处理策略和速率限制,把外部配送任务放进队列。这样即使邮件服务短暂失败,用户回到产品里仍然能看到通知。

官方资料建议同时参考 MDN Notifications APINext.js Route HandlersPostgreSQL CREATE TABLE。如果以后接入邮件,可看 Resend Next.js 官方指南,但不要在 API 请求里直接发邮件,应从队列 worker 里发送。Claude Code 的基本用法可看 Claude Code 官方文档

系统边界

生产通知系统可以拆成四层。

职责常见事故
事件源支付失败、评论、任务完成等事件同一个 webhook 到达多次
通知服务保存、去重、未读、偏好、限流规则散落在 UI 里
配送队列email、push、webhook 的重试外部服务失败后通知丢失
UI通知中心、未读徽标、浏览器提醒多标签页和多设备未读数不一致

不要一开始就把 push 当主角。浏览器通知需要权限和安全上下文,移动端常常还需要 Service Worker。邮件也有退订、投诉、成本和送达率问题。更稳的边界是: 先留下应用内通知,再决定是否把 email 或 push 放入配送队列。

给 Claude Code 的提示词可以这样写:

Build a notification system for Next.js App Router and PostgreSQL.
In-app notifications are the source of truth. Email and push only go into a delivery queue.
Requirements: unread state, idempotency_key, batching, user preferences, rate limit, retry queue, privacy-safe payload.
Implement DB schema, service, route handler, React notification center, and Vitest tests.

这里有几个术语。幂等性表示同一个操作执行多次,结果不会重复增加。通知场景里,同一笔支付失败 webhook 到达两次,也只应该生成一条通知。批处理表示把短时间内的多条事件合并,例如把十条评论整理成一封 digest 邮件。重试队列表示外部发送失败后,任务仍然保存在数据库里,worker 可以稍后再试。

先确定用例

第一个用例是 SaaS 运维告警。账单失败、同步失败、后台任务失败可能需要应用内通知和邮件。但如果一个任务一分钟失败一百次,不能发一百封邮件。可以让 critical 即时发送,其他 warning 进入五分钟批处理。

第二个用例是团队协作。评论、mention、review 请求需要准确的未读状态。用户会期待“标记已读”“全部已读”和稳定的未读数字。外部通知里不要放完整评论内容,只放摘要和登录后的目标页面。

第三个用例是内容和商业化运营。对 ClaudeCodeLab 这类网站,通知可以用于发布前检查、CTA 链接异常、免费 PDF 交付、产品购买后的 onboarding 和咨询提醒。通知应该服务转化,而不是制造噪音。可以继续阅读站内的 Claude Code 效率技巧Claude Code 表单验证

第四个用例是管理员审计。权限变更、API key 创建、数据导出都应该能追踪。通知正文不要放敏感数据,而是链接到登录后的审计记录。

数据库 Schema

下面的 PostgreSQL schema 把应用内通知、配送任务、用户偏好和限流分开。

CREATE EXTENSION IF NOT EXISTS pgcrypto;

CREATE TABLE notification_preferences (
  user_id text PRIMARY KEY,
  in_app_enabled boolean NOT NULL DEFAULT true,
  email_enabled boolean NOT NULL DEFAULT false,
  push_enabled boolean NOT NULL DEFAULT false,
  digest_minutes integer NOT NULL DEFAULT 5,
  created_at timestamptz NOT NULL DEFAULT now(),
  updated_at timestamptz NOT NULL DEFAULT now()
);

CREATE TABLE notifications (
  id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  user_id text NOT NULL,
  event_name text NOT NULL,
  title text NOT NULL,
  body text NOT NULL,
  target_url text,
  severity text NOT NULL DEFAULT 'info',
  data jsonb NOT NULL DEFAULT '{}'::jsonb,
  idempotency_key text,
  batch_key text,
  read_at timestamptz,
  created_at timestamptz NOT NULL DEFAULT now(),
  updated_at timestamptz NOT NULL DEFAULT now()
);

CREATE UNIQUE INDEX notifications_user_idempotency_unique
  ON notifications (user_id, idempotency_key)
  WHERE idempotency_key IS NOT NULL;

CREATE INDEX notifications_user_created_idx
  ON notifications (user_id, created_at DESC);

CREATE INDEX notifications_user_unread_idx
  ON notifications (user_id, created_at DESC)
  WHERE read_at IS NULL;

CREATE TABLE notification_delivery_queue (
  id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  notification_id uuid NOT NULL REFERENCES notifications(id) ON DELETE CASCADE,
  channel text NOT NULL CHECK (channel IN ('email', 'push')),
  status text NOT NULL DEFAULT 'pending'
    CHECK (status IN ('pending', 'sending', 'sent', 'failed')),
  attempts integer NOT NULL DEFAULT 0,
  max_attempts integer NOT NULL DEFAULT 5,
  available_at timestamptz NOT NULL DEFAULT now(),
  locked_at timestamptz,
  last_error text,
  created_at timestamptz NOT NULL DEFAULT now(),
  updated_at timestamptz NOT NULL DEFAULT now()
);

CREATE INDEX notification_delivery_queue_pick_idx
  ON notification_delivery_queue (status, available_at, created_at);

CREATE TABLE notification_rate_limits (
  user_id text NOT NULL,
  channel text NOT NULL,
  window_start timestamptz NOT NULL,
  count integer NOT NULL DEFAULT 0,
  PRIMARY KEY (user_id, channel, window_start)
);

未读状态只用 read_at 表示,不要再加一个 is_read。两个字段迟早会不一致。如果要支持删除,更推荐增加 archived_at,这样队列和审计链路不会被物理删除破坏。

通知服务

先安装依赖。

npm install pg
npm install -D @types/pg vitest

创建 lib/notification-service.ts。这个服务先保存应用内通知,再根据偏好和重要级别决定是否排入 email/push 队列。

import { Pool, type PoolClient } from "pg";

const pool = new Pool({
  connectionString: process.env.DATABASE_URL,
});

export type NotificationSeverity = "info" | "success" | "warning" | "critical";
export type ExternalChannel = "email" | "push";

export type NotificationRow = {
  id: string;
  user_id: string;
  event_name: string;
  title: string;
  body: string;
  target_url: string | null;
  severity: NotificationSeverity;
  data: Record<string, unknown>;
  idempotency_key: string | null;
  batch_key: string | null;
  read_at: string | null;
  created_at: string;
};

type Preferences = {
  in_app_enabled: boolean;
  email_enabled: boolean;
  push_enabled: boolean;
  digest_minutes: number;
};

export type CreateNotificationInput = {
  userId: string;
  eventName: string;
  title: string;
  body: string;
  targetUrl?: string;
  severity?: NotificationSeverity;
  data?: Record<string, unknown>;
  resourceId?: string;
  idempotencyKey?: string;
  batchKey?: string;
};

export function makeIdempotencyKey(input: {
  userId: string;
  eventName: string;
  resourceId?: string;
}) {
  if (!input.resourceId) return null;
  return `${input.userId}:${input.eventName}:${input.resourceId}`;
}

export function shouldQueueExternal(input: {
  channel: ExternalChannel;
  severity: NotificationSeverity;
  preferences: Pick<Preferences, "email_enabled" | "push_enabled">;
}) {
  if (input.channel === "email" && !input.preferences.email_enabled) return false;
  if (input.channel === "push" && !input.preferences.push_enabled) return false;
  return input.severity === "warning" || input.severity === "critical";
}

async function ensurePreferences(client: PoolClient, userId: string) {
  const result = await client.query<Preferences>(
    `
    INSERT INTO notification_preferences (user_id)
    VALUES ($1)
    ON CONFLICT (user_id) DO UPDATE
      SET updated_at = notification_preferences.updated_at
    RETURNING in_app_enabled, email_enabled, push_enabled, digest_minutes
    `,
    [userId],
  );
  return result.rows[0];
}

async function consumeRateLimit(
  client: PoolClient,
  userId: string,
  channel: ExternalChannel,
  limitPerMinute: number,
) {
  const result = await client.query<{ count: number }>(
    `
    INSERT INTO notification_rate_limits (user_id, channel, window_start, count)
    VALUES ($1, $2, date_trunc('minute', now()), 1)
    ON CONFLICT (user_id, channel, window_start)
    DO UPDATE SET count = notification_rate_limits.count + 1
    RETURNING count
    `,
    [userId, channel],
  );
  return result.rows[0].count <= limitPerMinute;
}

async function queueDelivery(
  client: PoolClient,
  notificationId: string,
  channel: ExternalChannel,
  availableAt: Date,
) {
  await client.query(
    `
    INSERT INTO notification_delivery_queue (notification_id, channel, available_at)
    VALUES ($1, $2, $3)
    `,
    [notificationId, channel, availableAt],
  );
}

export async function createNotification(input: CreateNotificationInput) {
  const client = await pool.connect();
  const severity = input.severity ?? "info";
  const idempotencyKey =
    input.idempotencyKey ??
    makeIdempotencyKey({
      userId: input.userId,
      eventName: input.eventName,
      resourceId: input.resourceId,
    });

  try {
    await client.query("BEGIN");
    const preferences = await ensurePreferences(client, input.userId);
    if (!preferences.in_app_enabled) {
      await client.query("COMMIT");
      return null;
    }

    const result = await client.query<NotificationRow & { inserted: boolean }>(
      `
      WITH inserted AS (
        INSERT INTO notifications (
          user_id, event_name, title, body, target_url, severity,
          data, idempotency_key, batch_key
        )
        VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9)
        ON CONFLICT (user_id, idempotency_key)
          WHERE idempotency_key IS NOT NULL
        DO NOTHING
        RETURNING *, true AS inserted
      )
      SELECT * FROM inserted
      UNION ALL
      SELECT *, false AS inserted
      FROM notifications
      WHERE user_id = $1
        AND idempotency_key = $8
        AND $8 IS NOT NULL
        AND NOT EXISTS (SELECT 1 FROM inserted)
      LIMIT 1
      `,
      [
        input.userId,
        input.eventName,
        input.title,
        input.body,
        input.targetUrl ?? null,
        severity,
        JSON.stringify(input.data ?? {}),
        idempotencyKey,
        input.batchKey ?? null,
      ],
    );

    const notification = result.rows[0];
    if (!notification) {
      throw new Error("Notification insert failed");
    }

    if (notification.inserted) {
      for (const channel of ["email", "push"] as const) {
        const allowedByPreference = shouldQueueExternal({
          channel,
          severity,
          preferences,
        });
        if (!allowedByPreference) continue;

        const allowedByRateLimit = await consumeRateLimit(
          client,
          input.userId,
          channel,
          channel === "email" ? 5 : 20,
        );
        if (!allowedByRateLimit) continue;

        const delayMs = input.batchKey ? preferences.digest_minutes * 60_000 : 0;
        await queueDelivery(
          client,
          notification.id,
          channel,
          new Date(Date.now() + delayMs),
        );
      }
    }

    await client.query("COMMIT");
    return notification;
  } catch (error) {
    await client.query("ROLLBACK");
    throw error;
  } finally {
    client.release();
  }
}

export async function listNotifications(userId: string, unreadOnly = false, limit = 30) {
  const result = await pool.query<NotificationRow>(
    `
    SELECT *
    FROM notifications
    WHERE user_id = $1
      AND ($2::boolean = false OR read_at IS NULL)
    ORDER BY created_at DESC
    LIMIT $3
    `,
    [userId, unreadOnly, Math.min(limit, 100)],
  );
  return result.rows;
}

export async function markNotificationRead(userId: string, notificationId: string) {
  const result = await pool.query<NotificationRow>(
    `
    UPDATE notifications
    SET read_at = COALESCE(read_at, now()), updated_at = now()
    WHERE user_id = $1 AND id = $2
    RETURNING *
    `,
    [userId, notificationId],
  );
  return result.rows[0] ?? null;
}

export async function markAllNotificationsRead(userId: string) {
  const result = await pool.query(
    `
    UPDATE notifications
    SET read_at = COALESCE(read_at, now()), updated_at = now()
    WHERE user_id = $1 AND read_at IS NULL
    `,
    [userId],
  );
  return result.rowCount ?? 0;
}

export type DeliveryJob = {
  id: string;
  notification_id: string;
  channel: ExternalChannel;
  attempts: number;
  max_attempts: number;
};

export async function claimNextDeliveryJob() {
  const result = await pool.query<DeliveryJob>(
    `
    UPDATE notification_delivery_queue
    SET status = 'sending', attempts = attempts + 1, locked_at = now(), updated_at = now()
    WHERE id = (
      SELECT id
      FROM notification_delivery_queue
      WHERE status = 'pending' AND available_at <= now()
      ORDER BY available_at ASC, created_at ASC
      FOR UPDATE SKIP LOCKED
      LIMIT 1
    )
    RETURNING id, notification_id, channel, attempts, max_attempts
    `,
  );
  return result.rows[0] ?? null;
}

export async function finishDeliveryJob(job: DeliveryJob, error?: unknown) {
  if (!error) {
    await pool.query(
      "UPDATE notification_delivery_queue SET status = 'sent', updated_at = now() WHERE id = $1",
      [job.id],
    );
    return;
  }

  const retryDelayMs = Math.min(30 * 60_000, 60_000 * 2 ** job.attempts);
  const failedPermanently = job.attempts >= job.max_attempts;

  await pool.query(
    `
    UPDATE notification_delivery_queue
    SET status = $2,
        available_at = $3,
        last_error = $4,
        updated_at = now()
    WHERE id = $1
    `,
    [
      job.id,
      failedPermanently ? "failed" : "pending",
      new Date(Date.now() + retryDelayMs),
      error instanceof Error ? error.message : String(error),
    ],
  );
}

这段代码的重点是: 重复事件会返回已有通知,不会继续创建配送任务。这就是 demo 和生产系统之间的差别。

API Route

创建 app/api/notifications/route.ts。示例为了方便复制,使用 x-demo-user-id。生产环境必须换成真实 session。

import { NextRequest, NextResponse } from "next/server";
import {
  createNotification,
  listNotifications,
  markAllNotificationsRead,
  markNotificationRead,
} from "@/lib/notification-service";

export const runtime = "nodejs";

function requireDemoUserId(request: NextRequest) {
  const userId = request.headers.get("x-demo-user-id");
  if (!userId) {
    throw new Error("Missing x-demo-user-id header. Replace this with real auth.");
  }
  return userId;
}

export async function GET(request: NextRequest) {
  try {
    const userId = requireDemoUserId(request);
    const unreadOnly = request.nextUrl.searchParams.get("unread") === "1";
    const notifications = await listNotifications(userId, unreadOnly);
    return NextResponse.json({ notifications });
  } catch (error) {
    return NextResponse.json(
      { error: error instanceof Error ? error.message : "Unknown error" },
      { status: 401 },
    );
  }
}

export async function POST(request: NextRequest) {
  try {
    const userId = requireDemoUserId(request);
    const body = await request.json();
    const notification = await createNotification({
      userId,
      eventName: String(body.eventName),
      title: String(body.title),
      body: String(body.body),
      targetUrl: body.targetUrl ? String(body.targetUrl) : undefined,
      severity: body.severity,
      resourceId: body.resourceId ? String(body.resourceId) : undefined,
      batchKey: body.batchKey ? String(body.batchKey) : undefined,
      data: typeof body.data === "object" && body.data ? body.data : {},
    });

    return NextResponse.json({ notification }, { status: 201 });
  } catch (error) {
    return NextResponse.json(
      { error: error instanceof Error ? error.message : "Unknown error" },
      { status: 400 },
    );
  }
}

export async function PATCH(request: NextRequest) {
  const userId = requireDemoUserId(request);
  const body = await request.json().catch(() => ({}));

  if (body.notificationId) {
    const notification = await markNotificationRead(userId, String(body.notificationId));
    return NextResponse.json({ notification });
  }

  const updated = await markAllNotificationsRead(userId);
  return NextResponse.json({ updated });
}

不要相信 body 里的 userId。通知 API 包含私有未读状态,用户身份必须来自认证层,每个更新都要带上所有者条件。

React 通知中心

UI 仍然以应用内通知中心为主。桌面浏览器通知只在用户点击按钮后请求权限。

"use client";

import { useEffect, useMemo, useState } from "react";

type AppNotification = {
  id: string;
  title: string;
  body: string;
  target_url: string | null;
  severity: "info" | "success" | "warning" | "critical";
  read_at: string | null;
  created_at: string;
};

const demoHeaders = {
  "x-demo-user-id": "demo-user-1",
};

export function NotificationCenter() {
  const [items, setItems] = useState<AppNotification[]>([]);
  const [open, setOpen] = useState(false);
  const [desktopEnabled, setDesktopEnabled] = useState(false);

  const unreadCount = useMemo(
    () => items.filter((item) => item.read_at === null).length,
    [items],
  );

  async function loadNotifications() {
    const response = await fetch("/api/notifications", {
      headers: demoHeaders,
      cache: "no-store",
    });
    if (!response.ok) return;
    const data = (await response.json()) as { notifications: AppNotification[] };
    setItems(data.notifications);
  }

  async function markRead(notificationId: string) {
    await fetch("/api/notifications", {
      method: "PATCH",
      headers: {
        ...demoHeaders,
        "content-type": "application/json",
      },
      body: JSON.stringify({ notificationId }),
    });
    setItems((current) =>
      current.map((item) =>
        item.id === notificationId
          ? { ...item, read_at: item.read_at ?? new Date().toISOString() }
          : item,
      ),
    );
  }

  async function markAllRead() {
    await fetch("/api/notifications", {
      method: "PATCH",
      headers: {
        ...demoHeaders,
        "content-type": "application/json",
      },
      body: JSON.stringify({ all: true }),
    });
    const now = new Date().toISOString();
    setItems((current) => current.map((item) => ({ ...item, read_at: item.read_at ?? now })));
  }

  async function enableDesktopNotifications() {
    if (!("Notification" in window)) return;
    const permission = await Notification.requestPermission();
    setDesktopEnabled(permission === "granted");
  }

  useEffect(() => {
    loadNotifications();
    const timer = window.setInterval(loadNotifications, 30_000);
    return () => window.clearInterval(timer);
  }, []);

  useEffect(() => {
    if (!desktopEnabled || document.visibilityState === "visible") return;
    const newest = items.find((item) => item.read_at === null);
    if (!newest) return;
    new Notification(newest.title, {
      body: newest.body,
      tag: newest.id,
    });
  }, [desktopEnabled, items]);

  return (
    <div className="notification-center">
      <button type="button" onClick={() => setOpen((value) => !value)}>
        Notifications
        {unreadCount > 0 ? <span aria-label={`${unreadCount} unread`}>{unreadCount}</span> : null}
      </button>

      {open ? (
        <section aria-label="Notification center">
          <div>
            <button type="button" onClick={markAllRead} disabled={unreadCount === 0}>
              Mark all read
            </button>
            <button type="button" onClick={enableDesktopNotifications}>
              Enable desktop alerts
            </button>
          </div>

          {items.length === 0 ? (
            <p>No notifications.</p>
          ) : (
            <ul>
              {items.map((item) => (
                <li key={item.id} data-unread={item.read_at === null}>
                  <strong>{item.title}</strong>
                  <p>{item.body}</p>
                  <small>{new Date(item.created_at).toLocaleString()}</small>
                  <div>
                    {item.target_url ? <a href={item.target_url}>Open</a> : null}
                    {item.read_at === null ? (
                      <button type="button" onClick={() => markRead(item.id)}>
                        Mark read
                      </button>
                    ) : null}
                  </div>
                </li>
              ))}
            </ul>
          )}
        </section>
      ) : null}
    </div>
  );
}

第一版用 30 秒轮询就够了。WebSocket 或 SSE 可以以后再加。稳定的未读状态比“看起来实时”更重要。

测试策略

创建 notification-service.test.ts。测试应先覆盖会造成事故的规则。

import { describe, expect, it } from "vitest";
import { makeIdempotencyKey, shouldQueueExternal } from "./notification-service";

describe("notification policy", () => {
  it("builds a stable idempotency key from user, event, and resource", () => {
    expect(
      makeIdempotencyKey({
        userId: "user_1",
        eventName: "invoice.payment_failed",
        resourceId: "invoice_9",
      }),
    ).toBe("user_1:invoice.payment_failed:invoice_9");
  });

  it("does not build an idempotency key without a resource id", () => {
    expect(
      makeIdempotencyKey({
        userId: "user_1",
        eventName: "system.notice",
      }),
    ).toBeNull();
  });

  it("queues email only when preferences and severity allow it", () => {
    expect(
      shouldQueueExternal({
        channel: "email",
        severity: "warning",
        preferences: { email_enabled: true, push_enabled: false },
      }),
    ).toBe(true);

    expect(
      shouldQueueExternal({
        channel: "email",
        severity: "info",
        preferences: { email_enabled: true, push_enabled: false },
      }),
    ).toBe(false);

    expect(
      shouldQueueExternal({
        channel: "push",
        severity: "critical",
        preferences: { email_enabled: true, push_enabled: false },
      }),
    ).toBe(false);
  });
});

再让 Claude Code 增加重复 webhook、偏好关闭、限流超出、队列重试失败、尝试把他人通知标记已读等测试。

常见坑

第一个坑是通知正文放太多隐私信息。email 和 push 可能出现在锁屏、转发邮件和供应商日志里。正文只写摘要,详细信息放在登录后的页面。

第二个坑是只在浏览器计算未读数。多标签页、多设备会很快不一致。服务端以 read_at IS NULL 为准,UI 可以乐观更新,但要定期刷新。

第三个坑是把限流放到通知创建之前。这样可能丢掉审计通知。更好的做法是保留应用内记录,只限制外部配送。

第四个坑是 sending 队列任务永久卡住。worker 崩溃后,需要把过旧的 locked job 放回 pending,并在管理界面显示永久失败。

第五个坑是给 Claude Code 的要求太模糊。“做通知功能”通常只会得到 toast。必须把数据模型、幂等、偏好、限流、隐私、重试队列和测试写进第一条指令。

商业化 CTA

通知可以支持免费 PDF 交付、产品购买后的 onboarding、CTA 链接异常提醒和咨询 follow-up。它的目标不是打扰用户,而是在用户回到产品时给出下一步。需要更完整的模板和运营材料,可以查看 ClaudeCodeLab 产品

实测结果

我把 schema 和服务代码贴进本地 Next.js 验证项目,用同一个 resourceId 连续发送两次 invoice.payment_failed。结果 notifications 只有一行,配送队列也只产生了第一次任务。关闭 email 偏好后,应用内通知仍然保留,但不再进入 email 队列。最关键的决定,是在写 UI 之前先确定 idempotency_keyread_at

总结

生产通知系统首先是数据一致性问题,然后才是 UI 问题。把应用内通知作为事实来源,把 email/push 放进队列,并从第一版开始要求未读、幂等、批处理、偏好、限流、隐私、重试和测试。这样使用 Claude Code 才不会只生成一个漂亮但脆弱的 toast。

#Claude Code #通知系统 #Next.js #PostgreSQL #React
免费

免费 PDF: Claude Code 速查表

输入邮箱即可获取一页 PDF,整理常用命令、审查习惯和安全工作流。

我们会妥善保护你的信息,不发送垃圾邮件。

把 Claude Code 变成真正能带来结果的工作流

先领取中文说明的免费 PDF,再进入英文商品页选择合适的教材。如果你需要团队落地、流程设计或内容变现支持,也可以直接咨询。

Masa

关于作者

Masa

专注 Claude Code 实务流程、团队导入和内容转化的工程师。