Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- RLS Phase 1 #8 — HeartbeatContext (TENANT_DIRECT)
-- High insert rate: BullMQ worker upserts per heartbeat cycle.
-- Worker uses explicit organizationId from job.data (ALS empty in workers).

-- 1. Composite index for RLS performance (CRITICAL — high-frequency table)
CREATE INDEX IF NOT EXISTS "HeartbeatContext_organizationId_id_idx"
ON "HeartbeatContext" ("organizationId", "id");

-- 2. Enable RLS with FORCE (so table owner also obeys policies)
ALTER TABLE "HeartbeatContext" ENABLE ROW LEVEL SECURITY;
ALTER TABLE "HeartbeatContext" FORCE ROW LEVEL SECURITY;

-- 3. Grants
GRANT SELECT, INSERT, UPDATE, DELETE ON "HeartbeatContext" TO app_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON "HeartbeatContext" TO admin_user;

-- 4. Policies (admin_user has BYPASSRLS — no policies needed for it)
CREATE POLICY heartbeatcontext_select ON "HeartbeatContext"
FOR SELECT TO app_user
USING ("organizationId" = current_setting('app.current_org_id', true));

CREATE POLICY heartbeatcontext_insert ON "HeartbeatContext"
FOR INSERT TO app_user
WITH CHECK ("organizationId" = current_setting('app.current_org_id', true));

CREATE POLICY heartbeatcontext_update ON "HeartbeatContext"
FOR UPDATE TO app_user
USING ("organizationId" = current_setting('app.current_org_id', true))
WITH CHECK ("organizationId" = current_setting('app.current_org_id', true));

CREATE POLICY heartbeatcontext_delete ON "HeartbeatContext"
FOR DELETE TO app_user
USING ("organizationId" = current_setting('app.current_org_id', true));

-- =========================================================================
-- Rollback (uncomment to revert)
-- =========================================================================
-- DROP POLICY IF EXISTS heartbeatcontext_select ON "HeartbeatContext";
-- DROP POLICY IF EXISTS heartbeatcontext_insert ON "HeartbeatContext";
-- DROP POLICY IF EXISTS heartbeatcontext_update ON "HeartbeatContext";
-- DROP POLICY IF EXISTS heartbeatcontext_delete ON "HeartbeatContext";
-- ALTER TABLE "HeartbeatContext" DISABLE ROW LEVEL SECURITY;
-- DROP INDEX IF EXISTS "HeartbeatContext_organizationId_id_idx";
13 changes: 11 additions & 2 deletions src/app/api/agents/[agentId]/heartbeat/context/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { z } from "zod";
import { requireAgentOwner, isAuthError } from "@/lib/api/auth-guard";
import { parseBodyWithLimit } from "@/lib/api/body-limit";
import { prisma } from "@/lib/prisma";
import { withOrgContext } from "@/lib/db/rls-middleware";
import { logger } from "@/lib/logger";
import { getContext, setContext } from "@/lib/heartbeat/context-manager";

Expand All @@ -27,7 +28,10 @@ export async function GET(
if (isAuthError(authResult)) return authResult;

try {
const context = await getContext(agentId);
const agent = await prisma.agent.findUnique({ where: { id: agentId }, select: { organizationId: true } });
const orgId = agent?.organizationId ?? null;

const context = await getContext(agentId, orgId);
return NextResponse.json({ success: true, data: context });
} catch (error) {
logger.error("GET /api/agents/[agentId]/heartbeat/context error", { agentId, error });
Expand Down Expand Up @@ -77,7 +81,12 @@ export async function DELETE(
if (isAuthError(authResult)) return authResult;

try {
const result = await prisma.heartbeatContext.deleteMany({ where: { agentId } });
const agent = await prisma.agent.findUnique({ where: { id: agentId }, select: { organizationId: true } });
const orgId = agent?.organizationId ?? null;

const result = await withOrgContext(prisma, orgId, (tx) =>
tx.heartbeatContext.deleteMany({ where: { agentId } })
);
return NextResponse.json({ success: true, data: { deletedCount: result.count } });
} catch (error) {
logger.error("DELETE /api/agents/[agentId]/heartbeat/context error", { agentId, error });
Expand Down
98 changes: 77 additions & 21 deletions src/lib/heartbeat/context-manager.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import { prisma } from "@/lib/prisma";
import { withOrgContext } from "@/lib/db/rls-middleware";
import { Prisma } from "@/generated/prisma";

export async function getContext(agentId: string): Promise<Record<string, unknown>> {
const items = await prisma.heartbeatContext.findMany({
where: {
agentId,
OR: [{ expiresAt: null }, { expiresAt: { gt: new Date() } }],
},
select: { key: true, value: true },
});
export async function getContext(
agentId: string,
organizationId?: string | null,
): Promise<Record<string, unknown>> {
const items = await withOrgContext(prisma, organizationId ?? null, (tx) =>
tx.heartbeatContext.findMany({
where: {
agentId,
OR: [{ expiresAt: null }, { expiresAt: { gt: new Date() } }],
},
select: { key: true, value: true },
})
);

return Object.fromEntries(items.map((item) => [item.key, item.value]));
}
Expand All @@ -23,26 +29,42 @@ export async function setContext(
const expiresAt = ttlSeconds ? new Date(Date.now() + ttlSeconds * 1000) : null;
const jsonValue = value as Prisma.InputJsonValue;

await prisma.heartbeatContext.upsert({
where: { agentId_key: { agentId, key } },
update: { value: jsonValue, expiresAt, ttlSeconds: ttlSeconds ?? null },
create: { agentId, organizationId, key, value: jsonValue, ttlSeconds: ttlSeconds ?? null, expiresAt },
});
await withOrgContext(prisma, organizationId, (tx) =>
tx.heartbeatContext.upsert({
where: { agentId_key: { agentId, key } },
update: { value: jsonValue, expiresAt, ttlSeconds: ttlSeconds ?? null },
create: { agentId, organizationId, key, value: jsonValue, ttlSeconds: ttlSeconds ?? null, expiresAt },
})
);
}

export async function deleteContext(agentId: string, key: string): Promise<void> {
await prisma.heartbeatContext.deleteMany({ where: { agentId, key } });
export async function deleteContext(
agentId: string,
key: string,
organizationId?: string | null,
): Promise<void> {
await withOrgContext(prisma, organizationId ?? null, (tx) =>
tx.heartbeatContext.deleteMany({ where: { agentId, key } })
);
}

export async function pruneContext(agentId: string): Promise<number> {
const result = await prisma.heartbeatContext.deleteMany({
where: { agentId, expiresAt: { lt: new Date() } },
});
export async function pruneContext(
agentId: string,
organizationId?: string | null,
): Promise<number> {
const result = await withOrgContext(prisma, organizationId ?? null, (tx) =>
tx.heartbeatContext.deleteMany({
where: { agentId, expiresAt: { lt: new Date() } },
})
);
return result.count;
}

export async function buildContextPrompt(agentId: string): Promise<string> {
const context = await getContext(agentId);
export async function buildContextPrompt(
agentId: string,
organizationId?: string | null,
): Promise<string> {
const context = await getContext(agentId, organizationId);
const keys = Object.keys(context);

if (keys.length === 0) return "";
Expand All @@ -51,3 +73,37 @@ export async function buildContextPrompt(agentId: string): Promise<string> {

return ["--- Agent Memory (from previous heartbeat runs) ---", ...lines, "---"].join("\n");
}

/**
* Batch helper for the BullMQ heartbeat worker hot path.
* Prune expired items, read active items, and build the context prompt in a
* single withOrgContext transaction — one SET call instead of three.
*/
export async function processContextForRun(
agentId: string,
organizationId: string | null,
): Promise<{ snapshot: Record<string, unknown>; prompt: string }> {
return withOrgContext(prisma, organizationId, async (tx) => {
await tx.heartbeatContext.deleteMany({
where: { agentId, expiresAt: { lt: new Date() } },
});

const items = await tx.heartbeatContext.findMany({
where: { agentId, OR: [{ expiresAt: null }, { expiresAt: { gt: new Date() } }] },
select: { key: true, value: true },
});

const snapshot = Object.fromEntries(items.map((i) => [i.key, i.value]));
const keys = Object.keys(snapshot);
const prompt =
keys.length === 0
? ""
: [
"--- Agent Memory (from previous heartbeat runs) ---",
...keys.map((k) => `${k}: ${JSON.stringify(snapshot[k])}`),
"---",
].join("\n");

return { snapshot, prompt };
});
}
9 changes: 4 additions & 5 deletions src/lib/heartbeat/heartbeat-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export async function processHeartbeatRunJob(job: Job<HeartbeatJobData>): Promis
const { prisma } = await import("@/lib/prisma");
const { loadContext, saveContext, saveMessages } = await import("@/lib/runtime/context");
const { executeFlow } = await import("@/lib/runtime/engine");
const { pruneContext, getContext, buildContextPrompt } = await import("@/lib/heartbeat/context-manager");
const { processContextForRun } = await import("@/lib/heartbeat/context-manager");

const run = await prisma.heartbeatRun.create({
data: { agentId, configId, organizationId, status: "RUNNING", startedAt: new Date() },
Expand Down Expand Up @@ -47,10 +47,9 @@ export async function processHeartbeatRunJob(job: Job<HeartbeatJobData>): Promis
try {
await registerSession(agentId, run.id, "heartbeat-worker", "internal");

await pruneContext(agentId);

const contextSnapshot = await getContext(agentId);
const contextPrompt = await buildContextPrompt(agentId);
// Single transaction: prune expired + read active + build prompt.
// Avoids 3 separate withOrgContext calls on this high-frequency path.
const { snapshot: contextSnapshot, prompt: contextPrompt } = await processContextForRun(agentId, organizationId);

await job.updateProgress(20);

Expand Down
Loading