From eef6a0d09d5019e8ca96886fff6a8a45b11273c8 Mon Sep 17 00:00:00 2001 From: buky Date: Sun, 24 May 2026 09:17:36 +0200 Subject: [PATCH] =?UTF-8?q?feat(rls):=20Phase=201=20#8=20=E2=80=94=20Heart?= =?UTF-8?q?beatContext=20RLS=20+=20callsite=20fixes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migration 20260602000000: TENANT_DIRECT RLS on HeartbeatContext. Hot path: processContextForRun() batches prune+get+buildPrompt into one withOrgContext transaction (1 SET instead of 3) for the BullMQ worker. Worker uses explicit organizationId from job.data (ALS empty in workers). Individual functions updated with organizationId param for route callsites. --- .../migration.sql | 43 ++++++++ .../[agentId]/heartbeat/context/route.ts | 13 ++- src/lib/heartbeat/context-manager.ts | 98 +++++++++++++++---- src/lib/heartbeat/heartbeat-worker.ts | 9 +- 4 files changed, 135 insertions(+), 28 deletions(-) create mode 100644 prisma/migrations/20260602000000_rls_phase1_heartbeatcontext/migration.sql diff --git a/prisma/migrations/20260602000000_rls_phase1_heartbeatcontext/migration.sql b/prisma/migrations/20260602000000_rls_phase1_heartbeatcontext/migration.sql new file mode 100644 index 00000000..7343b77f --- /dev/null +++ b/prisma/migrations/20260602000000_rls_phase1_heartbeatcontext/migration.sql @@ -0,0 +1,43 @@ +-- RLS Phase 1 #8 — HeartbeatContext (TENANT_DIRECT) +-- High insert rate: BullMQ worker upserts per heartbeat cycle. +-- Worker uses explicit organizationId from job.data (ALS empty in workers). + +-- 1. Composite index for RLS performance (CRITICAL — high-frequency table) +CREATE INDEX IF NOT EXISTS "HeartbeatContext_organizationId_id_idx" + ON "HeartbeatContext" ("organizationId", "id"); + +-- 2. Enable RLS with FORCE (so table owner also obeys policies) +ALTER TABLE "HeartbeatContext" ENABLE ROW LEVEL SECURITY; +ALTER TABLE "HeartbeatContext" FORCE ROW LEVEL SECURITY; + +-- 3. Grants +GRANT SELECT, INSERT, UPDATE, DELETE ON "HeartbeatContext" TO app_user; +GRANT SELECT, INSERT, UPDATE, DELETE ON "HeartbeatContext" TO admin_user; + +-- 4. Policies (admin_user has BYPASSRLS — no policies needed for it) +CREATE POLICY heartbeatcontext_select ON "HeartbeatContext" + FOR SELECT TO app_user + USING ("organizationId" = current_setting('app.current_org_id', true)); + +CREATE POLICY heartbeatcontext_insert ON "HeartbeatContext" + FOR INSERT TO app_user + WITH CHECK ("organizationId" = current_setting('app.current_org_id', true)); + +CREATE POLICY heartbeatcontext_update ON "HeartbeatContext" + FOR UPDATE TO app_user + USING ("organizationId" = current_setting('app.current_org_id', true)) + WITH CHECK ("organizationId" = current_setting('app.current_org_id', true)); + +CREATE POLICY heartbeatcontext_delete ON "HeartbeatContext" + FOR DELETE TO app_user + USING ("organizationId" = current_setting('app.current_org_id', true)); + +-- ========================================================================= +-- Rollback (uncomment to revert) +-- ========================================================================= +-- DROP POLICY IF EXISTS heartbeatcontext_select ON "HeartbeatContext"; +-- DROP POLICY IF EXISTS heartbeatcontext_insert ON "HeartbeatContext"; +-- DROP POLICY IF EXISTS heartbeatcontext_update ON "HeartbeatContext"; +-- DROP POLICY IF EXISTS heartbeatcontext_delete ON "HeartbeatContext"; +-- ALTER TABLE "HeartbeatContext" DISABLE ROW LEVEL SECURITY; +-- DROP INDEX IF EXISTS "HeartbeatContext_organizationId_id_idx"; diff --git a/src/app/api/agents/[agentId]/heartbeat/context/route.ts b/src/app/api/agents/[agentId]/heartbeat/context/route.ts index 3a0c51eb..ec5d0822 100644 --- a/src/app/api/agents/[agentId]/heartbeat/context/route.ts +++ b/src/app/api/agents/[agentId]/heartbeat/context/route.ts @@ -3,6 +3,7 @@ import { z } from "zod"; import { requireAgentOwner, isAuthError } from "@/lib/api/auth-guard"; import { parseBodyWithLimit } from "@/lib/api/body-limit"; import { prisma } from "@/lib/prisma"; +import { withOrgContext } from "@/lib/db/rls-middleware"; import { logger } from "@/lib/logger"; import { getContext, setContext } from "@/lib/heartbeat/context-manager"; @@ -27,7 +28,10 @@ export async function GET( if (isAuthError(authResult)) return authResult; try { - const context = await getContext(agentId); + const agent = await prisma.agent.findUnique({ where: { id: agentId }, select: { organizationId: true } }); + const orgId = agent?.organizationId ?? null; + + const context = await getContext(agentId, orgId); return NextResponse.json({ success: true, data: context }); } catch (error) { logger.error("GET /api/agents/[agentId]/heartbeat/context error", { agentId, error }); @@ -77,7 +81,12 @@ export async function DELETE( if (isAuthError(authResult)) return authResult; try { - const result = await prisma.heartbeatContext.deleteMany({ where: { agentId } }); + const agent = await prisma.agent.findUnique({ where: { id: agentId }, select: { organizationId: true } }); + const orgId = agent?.organizationId ?? null; + + const result = await withOrgContext(prisma, orgId, (tx) => + tx.heartbeatContext.deleteMany({ where: { agentId } }) + ); return NextResponse.json({ success: true, data: { deletedCount: result.count } }); } catch (error) { logger.error("DELETE /api/agents/[agentId]/heartbeat/context error", { agentId, error }); diff --git a/src/lib/heartbeat/context-manager.ts b/src/lib/heartbeat/context-manager.ts index 00074297..6d5f3acc 100644 --- a/src/lib/heartbeat/context-manager.ts +++ b/src/lib/heartbeat/context-manager.ts @@ -1,14 +1,20 @@ import { prisma } from "@/lib/prisma"; +import { withOrgContext } from "@/lib/db/rls-middleware"; import { Prisma } from "@/generated/prisma"; -export async function getContext(agentId: string): Promise> { - const items = await prisma.heartbeatContext.findMany({ - where: { - agentId, - OR: [{ expiresAt: null }, { expiresAt: { gt: new Date() } }], - }, - select: { key: true, value: true }, - }); +export async function getContext( + agentId: string, + organizationId?: string | null, +): Promise> { + const items = await withOrgContext(prisma, organizationId ?? null, (tx) => + tx.heartbeatContext.findMany({ + where: { + agentId, + OR: [{ expiresAt: null }, { expiresAt: { gt: new Date() } }], + }, + select: { key: true, value: true }, + }) + ); return Object.fromEntries(items.map((item) => [item.key, item.value])); } @@ -23,26 +29,42 @@ export async function setContext( const expiresAt = ttlSeconds ? new Date(Date.now() + ttlSeconds * 1000) : null; const jsonValue = value as Prisma.InputJsonValue; - await prisma.heartbeatContext.upsert({ - where: { agentId_key: { agentId, key } }, - update: { value: jsonValue, expiresAt, ttlSeconds: ttlSeconds ?? null }, - create: { agentId, organizationId, key, value: jsonValue, ttlSeconds: ttlSeconds ?? null, expiresAt }, - }); + await withOrgContext(prisma, organizationId, (tx) => + tx.heartbeatContext.upsert({ + where: { agentId_key: { agentId, key } }, + update: { value: jsonValue, expiresAt, ttlSeconds: ttlSeconds ?? null }, + create: { agentId, organizationId, key, value: jsonValue, ttlSeconds: ttlSeconds ?? null, expiresAt }, + }) + ); } -export async function deleteContext(agentId: string, key: string): Promise { - await prisma.heartbeatContext.deleteMany({ where: { agentId, key } }); +export async function deleteContext( + agentId: string, + key: string, + organizationId?: string | null, +): Promise { + await withOrgContext(prisma, organizationId ?? null, (tx) => + tx.heartbeatContext.deleteMany({ where: { agentId, key } }) + ); } -export async function pruneContext(agentId: string): Promise { - const result = await prisma.heartbeatContext.deleteMany({ - where: { agentId, expiresAt: { lt: new Date() } }, - }); +export async function pruneContext( + agentId: string, + organizationId?: string | null, +): Promise { + const result = await withOrgContext(prisma, organizationId ?? null, (tx) => + tx.heartbeatContext.deleteMany({ + where: { agentId, expiresAt: { lt: new Date() } }, + }) + ); return result.count; } -export async function buildContextPrompt(agentId: string): Promise { - const context = await getContext(agentId); +export async function buildContextPrompt( + agentId: string, + organizationId?: string | null, +): Promise { + const context = await getContext(agentId, organizationId); const keys = Object.keys(context); if (keys.length === 0) return ""; @@ -51,3 +73,37 @@ export async function buildContextPrompt(agentId: string): Promise { return ["--- Agent Memory (from previous heartbeat runs) ---", ...lines, "---"].join("\n"); } + +/** + * Batch helper for the BullMQ heartbeat worker hot path. + * Prune expired items, read active items, and build the context prompt in a + * single withOrgContext transaction — one SET call instead of three. + */ +export async function processContextForRun( + agentId: string, + organizationId: string | null, +): Promise<{ snapshot: Record; prompt: string }> { + return withOrgContext(prisma, organizationId, async (tx) => { + await tx.heartbeatContext.deleteMany({ + where: { agentId, expiresAt: { lt: new Date() } }, + }); + + const items = await tx.heartbeatContext.findMany({ + where: { agentId, OR: [{ expiresAt: null }, { expiresAt: { gt: new Date() } }] }, + select: { key: true, value: true }, + }); + + const snapshot = Object.fromEntries(items.map((i) => [i.key, i.value])); + const keys = Object.keys(snapshot); + const prompt = + keys.length === 0 + ? "" + : [ + "--- Agent Memory (from previous heartbeat runs) ---", + ...keys.map((k) => `${k}: ${JSON.stringify(snapshot[k])}`), + "---", + ].join("\n"); + + return { snapshot, prompt }; + }); +} diff --git a/src/lib/heartbeat/heartbeat-worker.ts b/src/lib/heartbeat/heartbeat-worker.ts index bd66ec9b..01dcc3dc 100644 --- a/src/lib/heartbeat/heartbeat-worker.ts +++ b/src/lib/heartbeat/heartbeat-worker.ts @@ -17,7 +17,7 @@ export async function processHeartbeatRunJob(job: Job): Promis const { prisma } = await import("@/lib/prisma"); const { loadContext, saveContext, saveMessages } = await import("@/lib/runtime/context"); const { executeFlow } = await import("@/lib/runtime/engine"); - const { pruneContext, getContext, buildContextPrompt } = await import("@/lib/heartbeat/context-manager"); + const { processContextForRun } = await import("@/lib/heartbeat/context-manager"); const run = await prisma.heartbeatRun.create({ data: { agentId, configId, organizationId, status: "RUNNING", startedAt: new Date() }, @@ -47,10 +47,9 @@ export async function processHeartbeatRunJob(job: Job): Promis try { await registerSession(agentId, run.id, "heartbeat-worker", "internal"); - await pruneContext(agentId); - - const contextSnapshot = await getContext(agentId); - const contextPrompt = await buildContextPrompt(agentId); + // Single transaction: prune expired + read active + build prompt. + // Avoids 3 separate withOrgContext calls on this high-frequency path. + const { snapshot: contextSnapshot, prompt: contextPrompt } = await processContextForRun(agentId, organizationId); await job.updateProgress(20);