From f87317c645058b72e267c5745feb15d816788a5c Mon Sep 17 00:00:00 2001 From: buky Date: Sun, 24 May 2026 10:01:04 +0200 Subject: [PATCH] =?UTF-8?q?feat(rls):=20Phase=201=20#9=20=E2=80=94=20Heart?= =?UTF-8?q?beatRun=20RLS=20+=20callsite=20fixes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migration 20260603000000: TENANT_DIRECT RLS on HeartbeatRun. Worker wraps all 4 heartbeatRun ops individually (create + 3 updates) with withOrgContext using job.data.organizationId — ops span the full job lifetime so no batch transaction is feasible. Route wraps findMany via agent org lookup. --- .../migration.sql | 43 +++++++++++++++ .../agents/[agentId]/heartbeat/runs/route.ts | 34 +++++++----- src/lib/heartbeat/heartbeat-worker.ts | 52 +++++++++++-------- 3 files changed, 93 insertions(+), 36 deletions(-) create mode 100644 prisma/migrations/20260603000000_rls_phase1_heartbeatrun/migration.sql diff --git a/prisma/migrations/20260603000000_rls_phase1_heartbeatrun/migration.sql b/prisma/migrations/20260603000000_rls_phase1_heartbeatrun/migration.sql new file mode 100644 index 00000000..20d5a6d5 --- /dev/null +++ b/prisma/migrations/20260603000000_rls_phase1_heartbeatrun/migration.sql @@ -0,0 +1,43 @@ +-- RLS Phase 1 #9 — HeartbeatRun (TENANT_DIRECT) +-- One row per heartbeat job: created RUNNING, updated to COMPLETED/FAILED. +-- Worker uses explicit organizationId from job.data (ALS empty in workers). + +-- 1. Composite index for RLS performance (CRITICAL) +CREATE INDEX IF NOT EXISTS "HeartbeatRun_organizationId_id_idx" + ON "HeartbeatRun" ("organizationId", "id"); + +-- 2. Enable RLS with FORCE (so table owner also obeys policies) +ALTER TABLE "HeartbeatRun" ENABLE ROW LEVEL SECURITY; +ALTER TABLE "HeartbeatRun" FORCE ROW LEVEL SECURITY; + +-- 3. Grants +GRANT SELECT, INSERT, UPDATE, DELETE ON "HeartbeatRun" TO app_user; +GRANT SELECT, INSERT, UPDATE, DELETE ON "HeartbeatRun" TO admin_user; + +-- 4. Policies (admin_user has BYPASSRLS — no policies needed for it) +CREATE POLICY heartbeatrun_select ON "HeartbeatRun" + FOR SELECT TO app_user + USING ("organizationId" = current_setting('app.current_org_id', true)); + +CREATE POLICY heartbeatrun_insert ON "HeartbeatRun" + FOR INSERT TO app_user + WITH CHECK ("organizationId" = current_setting('app.current_org_id', true)); + +CREATE POLICY heartbeatrun_update ON "HeartbeatRun" + FOR UPDATE TO app_user + USING ("organizationId" = current_setting('app.current_org_id', true)) + WITH CHECK ("organizationId" = current_setting('app.current_org_id', true)); + +CREATE POLICY heartbeatrun_delete ON "HeartbeatRun" + FOR DELETE TO app_user + USING ("organizationId" = current_setting('app.current_org_id', true)); + +-- ========================================================================= +-- Rollback (uncomment to revert) +-- ========================================================================= +-- DROP POLICY IF EXISTS heartbeatrun_select ON "HeartbeatRun"; +-- DROP POLICY IF EXISTS heartbeatrun_insert ON "HeartbeatRun"; +-- DROP POLICY IF EXISTS heartbeatrun_update ON "HeartbeatRun"; +-- DROP POLICY IF EXISTS heartbeatrun_delete ON "HeartbeatRun"; +-- ALTER TABLE "HeartbeatRun" DISABLE ROW LEVEL SECURITY; +-- DROP INDEX IF EXISTS "HeartbeatRun_organizationId_id_idx"; diff --git a/src/app/api/agents/[agentId]/heartbeat/runs/route.ts b/src/app/api/agents/[agentId]/heartbeat/runs/route.ts index 032d55b9..717e55b1 100644 --- a/src/app/api/agents/[agentId]/heartbeat/runs/route.ts +++ b/src/app/api/agents/[agentId]/heartbeat/runs/route.ts @@ -1,6 +1,7 @@ import { NextRequest, NextResponse } from "next/server"; import { requireAgentOwner, isAuthError } from "@/lib/api/auth-guard"; import { prisma } from "@/lib/prisma"; +import { withOrgContext } from "@/lib/db/rls-middleware"; import { logger } from "@/lib/logger"; interface RouteParams { @@ -21,20 +22,25 @@ export async function GET( const cursor = request.nextUrl.searchParams.get("cursor") ?? undefined; try { - const runs = await prisma.heartbeatRun.findMany({ - where: { agentId }, - orderBy: { startedAt: "desc" }, - take: PAGE_SIZE + 1, - ...(cursor && { cursor: { id: cursor }, skip: 1 }), - select: { - id: true, - status: true, - startedAt: true, - completedAt: true, - durationMs: true, - error: true, - }, - }); + const agent = await prisma.agent.findUnique({ where: { id: agentId }, select: { organizationId: true } }); + const orgId = agent?.organizationId ?? null; + + const runs = await withOrgContext(prisma, orgId, (tx) => + tx.heartbeatRun.findMany({ + where: { agentId }, + orderBy: { startedAt: "desc" }, + take: PAGE_SIZE + 1, + ...(cursor && { cursor: { id: cursor }, skip: 1 }), + select: { + id: true, + status: true, + startedAt: true, + completedAt: true, + durationMs: true, + error: true, + }, + }) + ); const hasMore = runs.length > PAGE_SIZE; const page = hasMore ? runs.slice(0, PAGE_SIZE) : runs; diff --git a/src/lib/heartbeat/heartbeat-worker.ts b/src/lib/heartbeat/heartbeat-worker.ts index 01dcc3dc..d7654fc6 100644 --- a/src/lib/heartbeat/heartbeat-worker.ts +++ b/src/lib/heartbeat/heartbeat-worker.ts @@ -19,10 +19,12 @@ export async function processHeartbeatRunJob(job: Job): Promis const { executeFlow } = await import("@/lib/runtime/engine"); const { processContextForRun } = await import("@/lib/heartbeat/context-manager"); - const run = await prisma.heartbeatRun.create({ - data: { agentId, configId, organizationId, status: "RUNNING", startedAt: new Date() }, - select: { id: true }, - }); + const run = await withOrgContext(prisma, organizationId, (tx) => + tx.heartbeatRun.create({ + data: { agentId, configId, organizationId, status: "RUNNING", startedAt: new Date() }, + select: { id: true }, + }) + ); await job.updateProgress(10); @@ -35,10 +37,12 @@ export async function processHeartbeatRunJob(job: Job): Promis ); if (!config) { - await prisma.heartbeatRun.update({ - where: { id: run.id }, - data: { status: "FAILED", completedAt: new Date(), error: "HeartbeatConfig not found" }, - }); + await withOrgContext(prisma, organizationId, (tx) => + tx.heartbeatRun.update({ + where: { id: run.id }, + data: { status: "FAILED", completedAt: new Date(), error: "HeartbeatConfig not found" }, + }) + ); throw new Error(`HeartbeatConfig ${configId} not found`); } @@ -79,16 +83,18 @@ export async function processHeartbeatRunJob(job: Job): Promis saveContext(context), ]); - await prisma.heartbeatRun.update({ - where: { id: run.id }, - data: { - status: "COMPLETED", - completedAt: new Date(), - durationMs, - contextSnapshot: contextSnapshot as Prisma.InputJsonValue, - output: { messages: outputMessages } as Prisma.InputJsonValue, - }, - }); + await withOrgContext(prisma, organizationId, (tx) => + tx.heartbeatRun.update({ + where: { id: run.id }, + data: { + status: "COMPLETED", + completedAt: new Date(), + durationMs, + contextSnapshot: contextSnapshot as Prisma.InputJsonValue, + output: { messages: outputMessages } as Prisma.InputJsonValue, + }, + }) + ); await job.updateProgress(100); @@ -99,10 +105,12 @@ export async function processHeartbeatRunJob(job: Job): Promis const error = err instanceof Error ? err.message : String(err); const durationMs = Date.now() - startTime; - await prisma.heartbeatRun.update({ - where: { id: run.id }, - data: { status: "FAILED", completedAt: new Date(), durationMs, error }, - }).catch(() => {/* non-critical */}); + await withOrgContext(prisma, organizationId, (tx) => + tx.heartbeatRun.update({ + where: { id: run.id }, + data: { status: "FAILED", completedAt: new Date(), durationMs, error }, + }) + ).catch(() => {/* non-critical */}); logger.error("Heartbeat run failed", { jobId: job.id, agentId, configId, error: err }); throw err;