diff --git a/prisma/migrations/20260602000000_rls_phase1_heartbeatcontext/migration.sql b/prisma/migrations/20260602000000_rls_phase1_heartbeatcontext/migration.sql new file mode 100644 index 00000000..7343b77f --- /dev/null +++ b/prisma/migrations/20260602000000_rls_phase1_heartbeatcontext/migration.sql @@ -0,0 +1,43 @@ +-- RLS Phase 1 #8 — HeartbeatContext (TENANT_DIRECT) +-- High insert rate: BullMQ worker upserts per heartbeat cycle. +-- Worker uses explicit organizationId from job.data (ALS empty in workers). + +-- 1. Composite index for RLS performance (CRITICAL — high-frequency table) +CREATE INDEX IF NOT EXISTS "HeartbeatContext_organizationId_id_idx" + ON "HeartbeatContext" ("organizationId", "id"); + +-- 2. Enable RLS with FORCE (so table owner also obeys policies) +ALTER TABLE "HeartbeatContext" ENABLE ROW LEVEL SECURITY; +ALTER TABLE "HeartbeatContext" FORCE ROW LEVEL SECURITY; + +-- 3. Grants +GRANT SELECT, INSERT, UPDATE, DELETE ON "HeartbeatContext" TO app_user; +GRANT SELECT, INSERT, UPDATE, DELETE ON "HeartbeatContext" TO admin_user; + +-- 4. Policies (admin_user has BYPASSRLS — no policies needed for it) +CREATE POLICY heartbeatcontext_select ON "HeartbeatContext" + FOR SELECT TO app_user + USING ("organizationId" = current_setting('app.current_org_id', true)); + +CREATE POLICY heartbeatcontext_insert ON "HeartbeatContext" + FOR INSERT TO app_user + WITH CHECK ("organizationId" = current_setting('app.current_org_id', true)); + +CREATE POLICY heartbeatcontext_update ON "HeartbeatContext" + FOR UPDATE TO app_user + USING ("organizationId" = current_setting('app.current_org_id', true)) + WITH CHECK ("organizationId" = current_setting('app.current_org_id', true)); + +CREATE POLICY heartbeatcontext_delete ON "HeartbeatContext" + FOR DELETE TO app_user + USING ("organizationId" = current_setting('app.current_org_id', true)); + +-- ========================================================================= +-- Rollback (uncomment to revert) +-- ========================================================================= +-- DROP POLICY IF EXISTS heartbeatcontext_select ON "HeartbeatContext"; +-- DROP POLICY IF EXISTS heartbeatcontext_insert ON "HeartbeatContext"; +-- DROP POLICY IF EXISTS heartbeatcontext_update ON "HeartbeatContext"; +-- DROP POLICY IF EXISTS heartbeatcontext_delete ON "HeartbeatContext"; +-- ALTER TABLE "HeartbeatContext" DISABLE ROW LEVEL SECURITY; +-- DROP INDEX IF EXISTS "HeartbeatContext_organizationId_id_idx"; diff --git a/src/app/api/agents/[agentId]/heartbeat/context/route.ts b/src/app/api/agents/[agentId]/heartbeat/context/route.ts index 3a0c51eb..ec5d0822 100644 --- a/src/app/api/agents/[agentId]/heartbeat/context/route.ts +++ b/src/app/api/agents/[agentId]/heartbeat/context/route.ts @@ -3,6 +3,7 @@ import { z } from "zod"; import { requireAgentOwner, isAuthError } from "@/lib/api/auth-guard"; import { parseBodyWithLimit } from "@/lib/api/body-limit"; import { prisma } from "@/lib/prisma"; +import { withOrgContext } from "@/lib/db/rls-middleware"; import { logger } from "@/lib/logger"; import { getContext, setContext } from "@/lib/heartbeat/context-manager"; @@ -27,7 +28,10 @@ export async function GET( if (isAuthError(authResult)) return authResult; try { - const context = await getContext(agentId); + const agent = await prisma.agent.findUnique({ where: { id: agentId }, select: { organizationId: true } }); + const orgId = agent?.organizationId ?? null; + + const context = await getContext(agentId, orgId); return NextResponse.json({ success: true, data: context }); } catch (error) { logger.error("GET /api/agents/[agentId]/heartbeat/context error", { agentId, error }); @@ -77,7 +81,12 @@ export async function DELETE( if (isAuthError(authResult)) return authResult; try { - const result = await prisma.heartbeatContext.deleteMany({ where: { agentId } }); + const agent = await prisma.agent.findUnique({ where: { id: agentId }, select: { organizationId: true } }); + const orgId = agent?.organizationId ?? null; + + const result = await withOrgContext(prisma, orgId, (tx) => + tx.heartbeatContext.deleteMany({ where: { agentId } }) + ); return NextResponse.json({ success: true, data: { deletedCount: result.count } }); } catch (error) { logger.error("DELETE /api/agents/[agentId]/heartbeat/context error", { agentId, error }); diff --git a/src/lib/heartbeat/context-manager.ts b/src/lib/heartbeat/context-manager.ts index 00074297..6d5f3acc 100644 --- a/src/lib/heartbeat/context-manager.ts +++ b/src/lib/heartbeat/context-manager.ts @@ -1,14 +1,20 @@ import { prisma } from "@/lib/prisma"; +import { withOrgContext } from "@/lib/db/rls-middleware"; import { Prisma } from "@/generated/prisma"; -export async function getContext(agentId: string): Promise> { - const items = await prisma.heartbeatContext.findMany({ - where: { - agentId, - OR: [{ expiresAt: null }, { expiresAt: { gt: new Date() } }], - }, - select: { key: true, value: true }, - }); +export async function getContext( + agentId: string, + organizationId?: string | null, +): Promise> { + const items = await withOrgContext(prisma, organizationId ?? null, (tx) => + tx.heartbeatContext.findMany({ + where: { + agentId, + OR: [{ expiresAt: null }, { expiresAt: { gt: new Date() } }], + }, + select: { key: true, value: true }, + }) + ); return Object.fromEntries(items.map((item) => [item.key, item.value])); } @@ -23,26 +29,42 @@ export async function setContext( const expiresAt = ttlSeconds ? new Date(Date.now() + ttlSeconds * 1000) : null; const jsonValue = value as Prisma.InputJsonValue; - await prisma.heartbeatContext.upsert({ - where: { agentId_key: { agentId, key } }, - update: { value: jsonValue, expiresAt, ttlSeconds: ttlSeconds ?? null }, - create: { agentId, organizationId, key, value: jsonValue, ttlSeconds: ttlSeconds ?? null, expiresAt }, - }); + await withOrgContext(prisma, organizationId, (tx) => + tx.heartbeatContext.upsert({ + where: { agentId_key: { agentId, key } }, + update: { value: jsonValue, expiresAt, ttlSeconds: ttlSeconds ?? null }, + create: { agentId, organizationId, key, value: jsonValue, ttlSeconds: ttlSeconds ?? null, expiresAt }, + }) + ); } -export async function deleteContext(agentId: string, key: string): Promise { - await prisma.heartbeatContext.deleteMany({ where: { agentId, key } }); +export async function deleteContext( + agentId: string, + key: string, + organizationId?: string | null, +): Promise { + await withOrgContext(prisma, organizationId ?? null, (tx) => + tx.heartbeatContext.deleteMany({ where: { agentId, key } }) + ); } -export async function pruneContext(agentId: string): Promise { - const result = await prisma.heartbeatContext.deleteMany({ - where: { agentId, expiresAt: { lt: new Date() } }, - }); +export async function pruneContext( + agentId: string, + organizationId?: string | null, +): Promise { + const result = await withOrgContext(prisma, organizationId ?? null, (tx) => + tx.heartbeatContext.deleteMany({ + where: { agentId, expiresAt: { lt: new Date() } }, + }) + ); return result.count; } -export async function buildContextPrompt(agentId: string): Promise { - const context = await getContext(agentId); +export async function buildContextPrompt( + agentId: string, + organizationId?: string | null, +): Promise { + const context = await getContext(agentId, organizationId); const keys = Object.keys(context); if (keys.length === 0) return ""; @@ -51,3 +73,37 @@ export async function buildContextPrompt(agentId: string): Promise { return ["--- Agent Memory (from previous heartbeat runs) ---", ...lines, "---"].join("\n"); } + +/** + * Batch helper for the BullMQ heartbeat worker hot path. + * Prune expired items, read active items, and build the context prompt in a + * single withOrgContext transaction — one SET call instead of three. + */ +export async function processContextForRun( + agentId: string, + organizationId: string | null, +): Promise<{ snapshot: Record; prompt: string }> { + return withOrgContext(prisma, organizationId, async (tx) => { + await tx.heartbeatContext.deleteMany({ + where: { agentId, expiresAt: { lt: new Date() } }, + }); + + const items = await tx.heartbeatContext.findMany({ + where: { agentId, OR: [{ expiresAt: null }, { expiresAt: { gt: new Date() } }] }, + select: { key: true, value: true }, + }); + + const snapshot = Object.fromEntries(items.map((i) => [i.key, i.value])); + const keys = Object.keys(snapshot); + const prompt = + keys.length === 0 + ? "" + : [ + "--- Agent Memory (from previous heartbeat runs) ---", + ...keys.map((k) => `${k}: ${JSON.stringify(snapshot[k])}`), + "---", + ].join("\n"); + + return { snapshot, prompt }; + }); +} diff --git a/src/lib/heartbeat/heartbeat-worker.ts b/src/lib/heartbeat/heartbeat-worker.ts index bd66ec9b..01dcc3dc 100644 --- a/src/lib/heartbeat/heartbeat-worker.ts +++ b/src/lib/heartbeat/heartbeat-worker.ts @@ -17,7 +17,7 @@ export async function processHeartbeatRunJob(job: Job): Promis const { prisma } = await import("@/lib/prisma"); const { loadContext, saveContext, saveMessages } = await import("@/lib/runtime/context"); const { executeFlow } = await import("@/lib/runtime/engine"); - const { pruneContext, getContext, buildContextPrompt } = await import("@/lib/heartbeat/context-manager"); + const { processContextForRun } = await import("@/lib/heartbeat/context-manager"); const run = await prisma.heartbeatRun.create({ data: { agentId, configId, organizationId, status: "RUNNING", startedAt: new Date() }, @@ -47,10 +47,9 @@ export async function processHeartbeatRunJob(job: Job): Promis try { await registerSession(agentId, run.id, "heartbeat-worker", "internal"); - await pruneContext(agentId); - - const contextSnapshot = await getContext(agentId); - const contextPrompt = await buildContextPrompt(agentId); + // Single transaction: prune expired + read active + build prompt. + // Avoids 3 separate withOrgContext calls on this high-frequency path. + const { snapshot: contextSnapshot, prompt: contextPrompt } = await processContextForRun(agentId, organizationId); await job.updateProgress(20);