用 Claude Code 安全构建多租户 SaaS:RLS、认证、计费与日志
用 Claude Code 实现多租户 SaaS:tenant_id、RLS、认证边界、计费限制、任务和泄漏测试。
多租户 SaaS 是指一个应用同时服务多个客户组织。这里的租户不是技术术语里的抽象概念,而是“客户数据边界”:公司、工作区、客户门户或账套。Claude Code 可以很快修改路由、数据库访问层、后台任务和 UI,因此越早把租户边界写清楚,后面的代码越容易审查。
最需要避免的事故很直接:Tenant A 的用户能看到 Tenant B 的项目、发票、文件、审计事件或 AI 提示词。只在代码评审里搜索where tenantId是不够的。安全做法是让tenant_id贯穿认证、会话、数据库、计费、后台任务、搜索索引和日志,并让数据库在应用层漏写过滤条件时仍然拒绝跨租户访问。
本文把 Claude Code 当作实现助手,而不是绕过安全评审的捷径。RLS 是 Row Level Security,也就是数据库按“行”判断当前会话能否读取或写入。事实依据请优先看 PostgreSQL 官方的Row Security Policies、CREATE POLICY 以及current_setting和set_config。
相关基础可以继续阅读Claude Code 认证实现、RBAC 权限控制和数据库迁移。多租户设计正好把这三件事连在一起。
先选择租户隔离模型
常见模型有三种。不要一上来就让 Claude Code 写 handler,先让它帮你检查隔离模型是否适合业务。
| 模型 | 适合场景 | 优点 | 风险 |
|---|---|---|---|
| 共享数据库、共享 schema | 早期或中期 SaaS,大量小客户 | 运维轻,分析方便 | 少一个tenant_id条件就可能泄漏 |
| 共享数据库、租户独立 schema | 少量大客户、需要更强分离 | 权限和迁移可分开一些 | schema 数量增加后迁移复杂 |
| 每租户独立数据库 | 金融、医疗、强合同隔离 | 备份、密钥、故障范围可分开 | 成本和发布流程更重 |
第一个判断标准是:客户数到 100 或 1000 时还能不能运维。多数中小 SaaS 可以从共享数据库、共享 schema 开始,但必须强化tenant_id、RLS、审计日志和单租户恢复流程。如果企业客户要求物理隔离,可以先设计“部分租户迁出到独立数据库”的路径,而不是一开始就把所有客户都拆成独立数据库。
三个真实用例很典型。第一,B2B CRM:公司、联系人、商机、备注都属于租户,销售只能看到自己公司的数据。第二,代理商客户门户:代理商员工可能加入多个租户,但客户员工只能进入自己的门户。第三,带 AI 功能的 SaaS:提示词调用、token、上传文件和存储容量必须按租户统计,否则计费和套餐限制都会错。
给 Claude Code 的第一条指令应是设计评审。
我想把这个 SaaS 代码库改成多租户。
假设使用共享 PostgreSQL 数据库和共享 schema。
请把表分为:租户拥有的表、全局参考表、必须启用 PostgreSQL RLS 的表。
禁止危险捷径:不能说“应用层 WHERE 足够”,不能完整记录请求 body,后台任务不能之后再推断 tenant_id。
请按修改范围、数据库约束、测试用例、迁移风险输出。
从可信来源传递 tenant_id
不要把浏览器传来的x-tenant-id或 URL 参数当成真实边界。更安全的顺序是:从 host 或路径解析候选租户,在服务器端确认登录用户属于该租户,然后只把验证过的tenantId交给业务服务。
Auth.js 官方的Extending the Session说明了如何给 session 增加服务端字段。但 session 应保持小而清晰:它证明“用户是谁”,租户成员关系仍应在服务器端查询,因为角色、成员状态和暂停租户都可能变化。
// src/lib/tenant-context.ts
import { z } from "zod";
const hostSchema = z.string().min(1).max(255);
export type SessionUser = {
id: string;
email: string;
};
export type TenantContext = {
tenantId: string;
userId: string;
role: "owner" | "admin" | "member" | "viewer";
requestId: string;
};
type TenantRecord = {
id: string;
subdomain: string | null;
custom_domain: string | null;
};
type MembershipRecord = {
tenant_id: string;
user_id: string;
role: TenantContext["role"];
};
export async function resolveTenantContext(input: {
request: Request;
sessionUser: SessionUser | null;
requestId: string;
findTenantByHost: (host: string) => Promise<TenantRecord | null>;
findMembership: (tenantId: string, userId: string) => Promise<MembershipRecord | null>;
}): Promise<TenantContext> {
if (!input.sessionUser) {
throw new Response("Unauthorized", { status: 401 });
}
const host = hostSchema.parse(new URL(input.request.url).host.toLowerCase());
const tenant = await input.findTenantByHost(host);
if (!tenant) {
throw new Response("Tenant not found", { status: 404 });
}
const membership = await input.findMembership(tenant.id, input.sessionUser.id);
if (!membership) {
throw new Response("Forbidden for this tenant", { status: 403 });
}
return {
tenantId: tenant.id,
userId: input.sessionUser.id,
role: membership.role,
requestId: input.requestId,
};
}
关键点是:这段代码不信任客户端给的租户 ID。内部 API 和队列任务也要遵守同一规则。来自边界外的 tenant_id 只是声明,必须由 membership、绑定到租户的 API key 或签名任务 payload 证明。
用 PostgreSQL RLS 做最后防线
应用层过滤仍然有用,但一个漏写的查询不应该变成数据泄漏。在共享 schema 模型中,每张租户表都需要tenant_id、外键、包含租户的索引和 RLS policy。PostgreSQL 中表 owner 和带BYPASSRLS的角色可能绕过 RLS,所以应用连接角色不应拥有受保护的表,关键表建议使用FORCE ROW LEVEL SECURITY。
-- db/migrations/20260602_multi_tenant_rls.sql
CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE TABLE tenants (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
slug text NOT NULL UNIQUE,
name text NOT NULL,
plan text NOT NULL DEFAULT 'starter',
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE app_users (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
email text NOT NULL UNIQUE,
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE tenant_memberships (
tenant_id uuid NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
user_id uuid NOT NULL REFERENCES app_users(id) ON DELETE CASCADE,
role text NOT NULL CHECK (role IN ('owner', 'admin', 'member', 'viewer')),
created_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (tenant_id, user_id)
);
CREATE TABLE projects (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id uuid NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
name text NOT NULL,
status text NOT NULL DEFAULT 'active',
created_by uuid NOT NULL REFERENCES app_users(id),
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX projects_tenant_id_id_idx ON projects (tenant_id, id);
CREATE INDEX tenant_memberships_user_id_tenant_id_idx ON tenant_memberships (user_id, tenant_id);
ALTER TABLE tenant_memberships ENABLE ROW LEVEL SECURITY;
ALTER TABLE tenant_memberships FORCE ROW LEVEL SECURITY;
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;
ALTER TABLE projects FORCE ROW LEVEL SECURITY;
CREATE POLICY tenant_memberships_isolation
ON tenant_memberships
FOR ALL
USING (tenant_id = nullif(current_setting('app.tenant_id', true), '')::uuid)
WITH CHECK (tenant_id = nullif(current_setting('app.tenant_id', true), '')::uuid);
CREATE POLICY projects_isolation
ON projects
FOR ALL
USING (tenant_id = nullif(current_setting('app.tenant_id', true), '')::uuid)
WITH CHECK (tenant_id = nullif(current_setting('app.tenant_id', true), '')::uuid);
连接池场景必须小心。不要把租户值永久留在连接 session 中,而是在事务里使用set_config(..., true),让它随事务结束自动清理。
// src/db/tenant-db.ts
import { Pool, PoolClient } from "pg";
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
});
const uuidPattern =
/^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i;
export async function withTenant<T>(
tenantId: string,
work: (client: PoolClient) => Promise<T>,
): Promise<T> {
if (!uuidPattern.test(tenantId)) {
throw new Error("Invalid tenant id");
}
const client = await pool.connect();
try {
await client.query("BEGIN");
await client.query("SELECT set_config('app.tenant_id', $1, true)", [tenantId]);
const result = await work(client);
await client.query("COMMIT");
return result;
} catch (error) {
await client.query("ROLLBACK");
throw error;
} finally {
client.release();
}
}
export async function listProjects(tenantId: string) {
return withTenant(tenantId, async (db) => {
const result = await db.query(
"SELECT id, name, status, created_at FROM projects ORDER BY created_at DESC",
);
return result.rows;
});
}
例子里故意没有写WHERE tenant_id = $1。如果 RLS 正常,PostgreSQL 会自动过滤不可见行。生产中为了性能可以同时加应用层过滤,但它不能替代 RLS。
认证、计费和后台任务要同边界
认证问题不是“用户是否登录”,而是“这个用户在这个租户里能做什么”。如果只用user_id查询记录,多组织顾问、代理商和客服账号都会变成风险。服务边界中至少传递tenantId、userId、role和requestId。
计费也必须按租户。Stripe 的usage-based billing围绕使用量上报,但你的应用首先要正确统计属于该租户的使用量。套餐限制必须在服务器写入或 AI 执行前拦截,而不是只在 UI 上显示。
// src/lib/plan-limits.ts
type Plan = "starter" | "growth" | "enterprise";
type Metric = "users" | "projects" | "aiRuns";
const PLAN_LIMITS: Record<Plan, Record<Metric, number>> = {
starter: { users: 5, projects: 20, aiRuns: 500 },
growth: { users: 50, projects: 500, aiRuns: 10_000 },
enterprise: { users: 10_000, projects: 1_000_000, aiRuns: 50_000_000 },
};
export function assertPlanLimit(input: {
plan: Plan;
metric: Metric;
currentUsage: number;
increment?: number;
}) {
const nextUsage = input.currentUsage + (input.increment ?? 1);
const limit = PLAN_LIMITS[input.plan][input.metric];
if (nextUsage > limit) {
throw new Response(
JSON.stringify({
error: "PLAN_LIMIT_EXCEEDED",
metric: input.metric,
limit,
nextUsage,
}),
{ status: 402, headers: { "content-type": "application/json" } },
);
}
}
后台任务是最容易漏的地方。不要只把projectId放进队列,再让 worker 全局搜索。入队时放入已验证的tenantId,worker 执行时再次进入withTenant上下文。
// src/jobs/send-project-digest.ts
import { withTenant } from "../db/tenant-db";
type ProjectDigestJob = {
tenantId: string;
projectId: string;
requestedBy: string;
};
export async function handleProjectDigestJob(job: ProjectDigestJob) {
return withTenant(job.tenantId, async (db) => {
const project = await db.query(
"SELECT id, name FROM projects WHERE id = $1",
[job.projectId],
);
if (project.rowCount !== 1) {
throw new Error("Project not visible for tenant");
}
await db.query(
"INSERT INTO audit_events (tenant_id, actor_user_id, event_name) VALUES ($1, $2, $3)",
[job.tenantId, job.requestedBy, "project_digest_sent"],
);
});
}
日志要足够,但不能什么都记
日志很容易变成权限更弱的第二数据库。OWASP 的Logging Cheat Sheet强调清洗事件数据、避免记录敏感信息。多租户 SaaS 通常需要tenantId、requestId、事件名、操作者 ID 和目标 ID,但不需要 Cookie、Authorization、API key、完整提示词、完整请求 body 或账单地址。
// src/lib/safe-log.ts
type LogLevel = "info" | "warn" | "error";
const SENSITIVE_KEYS = new Set([
"authorization",
"cookie",
"password",
"token",
"apiKey",
"prompt",
"secret",
]);
function redact(value: unknown): unknown {
if (!value || typeof value !== "object") return value;
if (Array.isArray(value)) return value.map(redact);
return Object.fromEntries(
Object.entries(value).map(([key, child]) => [
key,
SENSITIVE_KEYS.has(key) ? "[REDACTED]" : redact(child),
]),
);
}
export function tenantLog(level: LogLevel, message: string, fields: Record<string, unknown>) {
const safeFields = redact(fields);
console[level](JSON.stringify({ message, ...safeFields }));
}
审计日志要和普通运行日志分开。审计日志回答“谁、在哪个租户、对哪个对象、做了什么、何时发生”。除非业务或监管确实要求,不要把完整字段差异都塞进审计日志。
写会主动攻击隔离边界的测试
正常流程测试远远不够。你需要让 Tenant A 去访问 Tenant B 的 ID,让 worker 收到错误租户 payload,让app.tenant_id缺失,让套餐超限,让 logger 收到敏感字段,并确认系统失败得足够安全。
-- db/tests/rls-smoke-test.sql
\set ON_ERROR_STOP on
BEGIN;
INSERT INTO tenants (id, slug, name)
VALUES
('00000000-0000-4000-8000-000000000001', 'alpha', 'Alpha Inc'),
('00000000-0000-4000-8000-000000000002', 'beta', 'Beta Inc');
INSERT INTO app_users (id, email)
VALUES ('10000000-0000-4000-8000-000000000001', 'masa@example.com');
INSERT INTO projects (id, tenant_id, name, created_by)
VALUES
('20000000-0000-4000-8000-000000000001', '00000000-0000-4000-8000-000000000001', 'Alpha project', '10000000-0000-4000-8000-000000000001'),
('20000000-0000-4000-8000-000000000002', '00000000-0000-4000-8000-000000000002', 'Beta project', '10000000-0000-4000-8000-000000000001');
SELECT set_config('app.tenant_id', '00000000-0000-4000-8000-000000000001', true);
DO $$
DECLARE visible_count integer;
BEGIN
SELECT count(*) INTO visible_count FROM projects;
IF visible_count <> 1 THEN
RAISE EXCEPTION 'RLS failed: expected 1 visible project, got %', visible_count;
END IF;
END $$;
ROLLBACK;
#!/usr/bin/env bash
set -euo pipefail
: "${DATABASE_URL:?DATABASE_URL is required}"
psql "$DATABASE_URL" -f db/migrations/20260602_multi_tenant_rls.sql
psql "$DATABASE_URL" -f db/tests/rls-smoke-test.sql
可以这样要求 Claude Code:
请添加多租户泄漏测试,覆盖以下场景:
1. Tenant A 的 session 使用 Tenant B 的 projectId 时返回 403 或 404。
2. 没有 app.tenant_id 的数据库查询返回 0 行或 fail closed。
3. 没有 tenantId 的后台任务 payload 被拒绝。
4. 超过套餐限制时,在写入前返回 402。
5. 日志里不能出现 authorization、cookie、prompt、token、apiKey。
先说明每个测试会抓住哪种错误实现,再给出修复后的命令和结果。
避免迁移陷阱
把已有 SaaS 改成多租户时,最常见的错误是加一个 nullable 的tenant_id然后上线,承诺以后清理。迁移中短暂允许 NULL 可以接受,但发布完成前必须有NOT NULL、外键、租户感知唯一约束、索引、RLS 和测试。
重点看五个坑。第一,区分全局参考数据和租户数据,国家代码可以全局,客户设置不能全局。第二,文件存储 key 也要包含租户上下文,不能只靠数据库保护。第三,搜索索引必须带tenant_id过滤,Algolia、Meilisearch、OpenSearch 都一样。第四,不要为了管理后台关闭 RLS,客服访问应是显式的、可审计的 impersonation。第五,演练单租户恢复;如果只能全库恢复,事故时会非常被动。
请为现有表转换到多租户创建迁移计划。
Phase 1: 添加 tenant_id 列,并为现有记录建立映射。
Phase 2: 编写 backfill SQL 和 NULL 剩余校验 SQL。
Phase 3: 添加 NOT NULL、外键、租户感知唯一约束和索引。
Phase 4: 启用 RLS,并建议哪些表需要 FORCE ROW LEVEL SECURITY。
Phase 5: 测试 API、后台任务、日志和搜索索引是否存在跨租户泄漏。
每个 Phase 都要包含回滚条件和验证 SQL。
给 Claude Code 的安全提示词
提示词本身也要带边界。不要只说“快速改成多租户”或“管理员都能看”。更安全的提示词要写明禁止事项、测试要求、官方依据和允许修改范围。
你负责实现安全的多租户 SaaS 行为。
允许修改范围:src/app、src/lib、db/migrations、tests。
必需:
- tenant_id 必须在服务器端解析,不能信任客户端传入值。
- 每张租户拥有的表都必须使用 PostgreSQL RLS。
- 连接池中必须在事务内调用 set_config('app.tenant_id', value, true)。
- tenantId 必须传递到认证、计费、后台任务、日志和搜索索引。
- 添加会故意尝试跨租户读写的测试。
禁止:
- 关闭 RLS。
- 信任 x-tenant-id。
- 记录 Authorization、Cookie、API key 或完整 prompt。
- 没有测试就宣布完成。
最终报告必须包含修改文件、执行命令、发现的失败案例和残余风险。
Anthropic 的Claude Code Security说明了权限确认、提示词注入和用户审查责任。对敏感 SaaS 仓库,建议把允许命令、MCP 连接和项目规则都放入可审查的配置中。
CTA 与实际交付
多租户不是最显眼的功能,但在 B2B SaaS 销售中非常能建立信任。你能解释租户数据隔离、审计日志、套餐限制、客服访问和单租户恢复边界,安全问卷和企业评审就会顺很多。
团队采用 Claude Code 时,把租户规则写进CLAUDE.md并用测试固定下来。每次评审都问四个问题:tenant_id是否传到底,RLS 是否能挡住错误查询,后台任务是否带租户,日志是否不会泄漏。需要团队导入支持可以查看Claude Code 培训与咨询,个人开发者可以先从免费速查表把清单变成可重复的提示词。
实际试用结果
Masa 在一个小型 CRM 样例中试这个方案时,最先漏的不是 API route,而是日次摘要 worker。UI 已经传了tenantId,但队列 payload 只有projectId,worker 因此可能在错误租户里查找项目。把 worker 也改成通过withTenant执行,并加入“缺少app.tenant_id时查询失败”的 SQL 测试后,这个问题可以稳定复现并修复。给 Claude Code 的最好指令不是“补正常测试”,而是“先写 Tenant A 读取 Tenant B 的失败测试”。
免费 PDF: Claude Code 速查表
输入邮箱即可获取一页 PDF,整理常用命令、审查习惯和安全工作流。
我们会妥善保护你的信息,不发送垃圾邮件。
把 Claude Code 变成真正能带来结果的工作流
先领取中文说明的免费 PDF,再进入英文商品页选择合适的教材。如果你需要团队落地、流程设计或内容变现支持,也可以直接咨询。
关于作者
Masa
专注 Claude Code 实务流程、团队导入和内容转化的工程师。
相关文章
从Obsidian到CLAUDE.md的Claude Code流程:不再反复解释上下文
把 Obsidian 工作笔记整理成 CLAUDE.md 运行说明,让 Claude Code 每次都带着正确上下文开始。
Claude Code 收入 CTA 路由:从文章分流到 PDF、Gumroad 与咨询
用 Claude Code 按读者意图把文章流量分到免费 PDF、Gumroad 教材或咨询入口。
Claude Code 团队交接规则: 把审查证据、权限、回滚和收入路径一起交付
面向团队的 Claude Code 交接格式: 证据、权限、回滚、免费 PDF、Gumroad 与咨询路径都要可审查。