From aee5c3cbf4467d94903322c2eb897ad67cfb8fd1 Mon Sep 17 00:00:00 2001 From: buky Date: Sun, 24 May 2026 08:56:31 +0200 Subject: [PATCH] =?UTF-8?q?feat(rls):=20Phase=201=20#7=20=E2=80=94=20Heart?= =?UTF-8?q?beatConfig=20RLS=20+=20callsite=20fixes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migration 20260601000000: TENANT_DIRECT RLS on HeartbeatConfig. BullMQ worker uses explicit organizationId from job.data (ALS empty in workers). Scheduler functions gain organizationId param; route and template engine wrapped. --- .../migration.sql | 42 +++++++++++++++ .../api/agents/[agentId]/heartbeat/route.ts | 51 ++++++++++++------- src/lib/heartbeat/heartbeat-scheduler.ts | 45 +++++++++------- src/lib/heartbeat/heartbeat-worker.ts | 12 +++-- src/lib/templates/template-engine.ts | 46 +++++++++-------- 5 files changed, 134 insertions(+), 62 deletions(-) create mode 100644 prisma/migrations/20260601000000_rls_phase1_heartbeatconfig/migration.sql diff --git a/prisma/migrations/20260601000000_rls_phase1_heartbeatconfig/migration.sql b/prisma/migrations/20260601000000_rls_phase1_heartbeatconfig/migration.sql new file mode 100644 index 00000000..fda3740c --- /dev/null +++ b/prisma/migrations/20260601000000_rls_phase1_heartbeatconfig/migration.sql @@ -0,0 +1,42 @@ +-- RLS Phase 1 #7 — HeartbeatConfig (TENANT_DIRECT) +-- BullMQ worker reads this via organizationId from job.data; ALS is empty in workers. + +-- 1. Composite index for RLS performance (CRITICAL) +CREATE INDEX IF NOT EXISTS "HeartbeatConfig_organizationId_id_idx" + ON "HeartbeatConfig" ("organizationId", "id"); + +-- 2. Enable RLS with FORCE (so table owner also obeys policies) +ALTER TABLE "HeartbeatConfig" ENABLE ROW LEVEL SECURITY; +ALTER TABLE "HeartbeatConfig" FORCE ROW LEVEL SECURITY; + +-- 3. Grants +GRANT SELECT, INSERT, UPDATE, DELETE ON "HeartbeatConfig" TO app_user; +GRANT SELECT, INSERT, UPDATE, DELETE ON "HeartbeatConfig" TO admin_user; + +-- 4. Policies (admin_user has BYPASSRLS — no policies needed for it) +CREATE POLICY heartbeatconfig_select ON "HeartbeatConfig" + FOR SELECT TO app_user + USING ("organizationId" = current_setting('app.current_org_id', true)); + +CREATE POLICY heartbeatconfig_insert ON "HeartbeatConfig" + FOR INSERT TO app_user + WITH CHECK ("organizationId" = current_setting('app.current_org_id', true)); + +CREATE POLICY heartbeatconfig_update ON "HeartbeatConfig" + FOR UPDATE TO app_user + USING ("organizationId" = current_setting('app.current_org_id', true)) + WITH CHECK ("organizationId" = current_setting('app.current_org_id', true)); + +CREATE POLICY heartbeatconfig_delete ON "HeartbeatConfig" + FOR DELETE TO app_user + USING ("organizationId" = current_setting('app.current_org_id', true)); + +-- ========================================================================= +-- Rollback (uncomment to revert) +-- ========================================================================= +-- DROP POLICY IF EXISTS heartbeatconfig_select ON "HeartbeatConfig"; +-- DROP POLICY IF EXISTS heartbeatconfig_insert ON "HeartbeatConfig"; +-- DROP POLICY IF EXISTS heartbeatconfig_update ON "HeartbeatConfig"; +-- DROP POLICY IF EXISTS heartbeatconfig_delete ON "HeartbeatConfig"; +-- ALTER TABLE "HeartbeatConfig" DISABLE ROW LEVEL SECURITY; +-- DROP INDEX IF EXISTS "HeartbeatConfig_organizationId_id_idx"; diff --git a/src/app/api/agents/[agentId]/heartbeat/route.ts b/src/app/api/agents/[agentId]/heartbeat/route.ts index 0da0cdfb..5e617170 100644 --- a/src/app/api/agents/[agentId]/heartbeat/route.ts +++ b/src/app/api/agents/[agentId]/heartbeat/route.ts @@ -3,6 +3,7 @@ import { z } from "zod"; import { requireAgentOwner, isAuthError } from "@/lib/api/auth-guard"; import { parseBodyWithLimit } from "@/lib/api/body-limit"; import { prisma } from "@/lib/prisma"; +import { withOrgContext } from "@/lib/db/rls-middleware"; import { logger } from "@/lib/logger"; import { validateCronExpression, validateTimezone } from "@/lib/scheduler/cron-validator"; import { scheduleHeartbeat, unscheduleHeartbeat } from "@/lib/heartbeat/heartbeat-scheduler"; @@ -30,10 +31,15 @@ export async function GET( if (isAuthError(authResult)) return authResult; try { - const config = await prisma.heartbeatConfig.findUnique({ - where: { agentId }, - include: { _count: { select: { runs: true } } }, - }); + const agent = await prisma.agent.findUnique({ where: { id: agentId }, select: { organizationId: true } }); + const orgId = agent?.organizationId ?? null; + + const config = await withOrgContext(prisma, orgId, (tx) => + tx.heartbeatConfig.findUnique({ + where: { agentId }, + include: { _count: { select: { runs: true } } }, + }) + ); return NextResponse.json({ success: true, data: config }); } catch (error) { @@ -76,19 +82,21 @@ export async function POST( } try { - const config = await prisma.heartbeatConfig.upsert({ - where: { agentId }, - update: { - cronExpression, - timezone, - ...(systemPrompt !== undefined && { systemPrompt }), - ...(maxContextItems !== undefined && { maxContextItems }), - enabled, - }, - create: { agentId, organizationId, cronExpression, timezone, systemPrompt, maxContextItems: maxContextItems ?? 50, enabled }, - }); - - await scheduleHeartbeat(config.id); + const config = await withOrgContext(prisma, organizationId, (tx) => + tx.heartbeatConfig.upsert({ + where: { agentId }, + update: { + cronExpression, + timezone, + ...(systemPrompt !== undefined && { systemPrompt }), + ...(maxContextItems !== undefined && { maxContextItems }), + enabled, + }, + create: { agentId, organizationId, cronExpression, timezone, systemPrompt, maxContextItems: maxContextItems ?? 50, enabled }, + }) + ); + + await scheduleHeartbeat(config.id, organizationId); return NextResponse.json({ success: true, data: config }, { status: 201 }); } catch (error) { @@ -107,13 +115,18 @@ export async function DELETE( if (isAuthError(authResult)) return authResult; try { - const config = await prisma.heartbeatConfig.findUnique({ where: { agentId }, select: { id: true } }); + const agent = await prisma.agent.findUnique({ where: { id: agentId }, select: { organizationId: true } }); + const orgId = agent?.organizationId ?? null; + + const config = await withOrgContext(prisma, orgId, (tx) => + tx.heartbeatConfig.findUnique({ where: { agentId }, select: { id: true } }) + ); if (!config) { return NextResponse.json({ success: false, error: "Heartbeat config not found" }, { status: 404 }); } - await unscheduleHeartbeat(config.id); + await unscheduleHeartbeat(config.id, orgId); return NextResponse.json({ success: true, data: null }); } catch (error) { diff --git a/src/lib/heartbeat/heartbeat-scheduler.ts b/src/lib/heartbeat/heartbeat-scheduler.ts index a4e1ccf5..e707299f 100644 --- a/src/lib/heartbeat/heartbeat-scheduler.ts +++ b/src/lib/heartbeat/heartbeat-scheduler.ts @@ -1,13 +1,16 @@ import { prisma } from "@/lib/prisma"; +import { withOrgContext } from "@/lib/db/rls-middleware"; import { logger } from "@/lib/logger"; import { computeNextRunAt } from "@/lib/scheduler/cron-validator"; import type { ScheduleType } from "@/generated/prisma"; -export async function scheduleHeartbeat(configId: string): Promise { - const config = await prisma.heartbeatConfig.findUnique({ - where: { id: configId }, - select: { id: true, agentId: true, cronExpression: true, timezone: true, enabled: true, flowScheduleId: true }, - }); +export async function scheduleHeartbeat(configId: string, organizationId: string | null): Promise { + const config = await withOrgContext(prisma, organizationId, (tx) => + tx.heartbeatConfig.findUnique({ + where: { id: configId }, + select: { id: true, agentId: true, cronExpression: true, timezone: true, enabled: true, flowScheduleId: true }, + }) + ); if (!config) throw new Error(`HeartbeatConfig ${configId} not found`); @@ -35,20 +38,24 @@ export async function scheduleHeartbeat(configId: string): Promise { }, }); - await prisma.heartbeatConfig.update({ - where: { id: configId }, - data: { flowScheduleId: schedule.id }, - }); + await withOrgContext(prisma, organizationId, (tx) => + tx.heartbeatConfig.update({ + where: { id: configId }, + data: { flowScheduleId: schedule.id }, + }) + ); logger.info("Heartbeat FlowSchedule created", { configId, flowScheduleId: schedule.id }); } } -export async function unscheduleHeartbeat(configId: string): Promise { - const config = await prisma.heartbeatConfig.findUnique({ - where: { id: configId }, - select: { flowScheduleId: true }, - }); +export async function unscheduleHeartbeat(configId: string, organizationId: string | null): Promise { + const config = await withOrgContext(prisma, organizationId, (tx) => + tx.heartbeatConfig.findUnique({ + where: { id: configId }, + select: { flowScheduleId: true }, + }) + ); if (!config) return; @@ -59,10 +66,12 @@ export async function unscheduleHeartbeat(configId: string): Promise { }); } - await prisma.heartbeatConfig.update({ - where: { id: configId }, - data: { enabled: false }, - }); + await withOrgContext(prisma, organizationId, (tx) => + tx.heartbeatConfig.update({ + where: { id: configId }, + data: { enabled: false }, + }) + ); logger.info("Heartbeat unscheduled", { configId }); } diff --git a/src/lib/heartbeat/heartbeat-worker.ts b/src/lib/heartbeat/heartbeat-worker.ts index bc29aec7..bd66ec9b 100644 --- a/src/lib/heartbeat/heartbeat-worker.ts +++ b/src/lib/heartbeat/heartbeat-worker.ts @@ -2,6 +2,7 @@ import type { Job } from "bullmq"; import { logger } from "@/lib/logger"; import type { Prisma } from "@/generated/prisma"; import { registerSession, removeSession } from "@/lib/session/session-tracker"; +import { withOrgContext } from "@/lib/db/rls-middleware"; export interface HeartbeatJobData { type: "heartbeat.run"; @@ -25,10 +26,13 @@ export async function processHeartbeatRunJob(job: Job): Promis await job.updateProgress(10); - const config = await prisma.heartbeatConfig.findUnique({ - where: { id: configId }, - select: { systemPrompt: true }, - }); + // ALS is empty in BullMQ workers; organizationId must come from job.data explicitly. + const config = await withOrgContext(prisma, organizationId, (tx) => + tx.heartbeatConfig.findUnique({ + where: { id: configId }, + select: { systemPrompt: true }, + }) + ); if (!config) { await prisma.heartbeatRun.update({ diff --git a/src/lib/templates/template-engine.ts b/src/lib/templates/template-engine.ts index 20790069..9afc599d 100644 --- a/src/lib/templates/template-engine.ts +++ b/src/lib/templates/template-engine.ts @@ -121,18 +121,20 @@ export async function exportTemplate( agentId: string, organizationId: string, ): Promise<{ payload: TemplatePayload; checksum: string }> { - const agent = await prisma.agent.findUnique({ - where: { id: agentId }, - include: { - flow: true, - mcpServers: { include: { mcpServer: true } }, - heartbeatConfig: true, - goalLinks: { - where: { goal: { status: "ACTIVE" } }, - include: { goal: true }, + const agent = await withOrgContext(prisma, organizationId, (tx) => + tx.agent.findUnique({ + where: { id: agentId }, + include: { + flow: true, + mcpServers: { include: { mcpServer: true } }, + heartbeatConfig: true, + goalLinks: { + where: { goal: { status: "ACTIVE" } }, + include: { goal: true }, + }, }, - }, - }); + }) + ); if (!agent) throw new Error(`Agent ${agentId} not found`); @@ -265,16 +267,18 @@ export async function importTemplate( } if (payload.heartbeatConfig) { - await prisma.heartbeatConfig.create({ - data: { - agentId: agent.id, - organizationId, - cronExpression: payload.heartbeatConfig.cronExpression, - timezone: payload.heartbeatConfig.timezone, - systemPrompt: payload.heartbeatConfig.systemPrompt, - maxContextItems: payload.heartbeatConfig.maxContextItems, - }, - }); + await withOrgContext(prisma, organizationId, (tx) => + tx.heartbeatConfig.create({ + data: { + agentId: agent.id, + organizationId, + cronExpression: payload.heartbeatConfig!.cronExpression, + timezone: payload.heartbeatConfig!.timezone, + systemPrompt: payload.heartbeatConfig!.systemPrompt, + maxContextItems: payload.heartbeatConfig!.maxContextItems, + }, + }) + ); } for (const goal of payload.goals) {