Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- RLS Phase 1 #9 — HeartbeatRun (TENANT_DIRECT)
-- One row per heartbeat job: created RUNNING, updated to COMPLETED/FAILED.
-- Worker uses explicit organizationId from job.data (ALS empty in workers).

-- 1. Composite index for RLS performance (CRITICAL)
CREATE INDEX IF NOT EXISTS "HeartbeatRun_organizationId_id_idx"
ON "HeartbeatRun" ("organizationId", "id");

-- 2. Enable RLS with FORCE (so table owner also obeys policies)
ALTER TABLE "HeartbeatRun" ENABLE ROW LEVEL SECURITY;
ALTER TABLE "HeartbeatRun" FORCE ROW LEVEL SECURITY;

-- 3. Grants
GRANT SELECT, INSERT, UPDATE, DELETE ON "HeartbeatRun" TO app_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON "HeartbeatRun" TO admin_user;

-- 4. Policies (admin_user has BYPASSRLS — no policies needed for it)
CREATE POLICY heartbeatrun_select ON "HeartbeatRun"
FOR SELECT TO app_user
USING ("organizationId" = current_setting('app.current_org_id', true));

CREATE POLICY heartbeatrun_insert ON "HeartbeatRun"
FOR INSERT TO app_user
WITH CHECK ("organizationId" = current_setting('app.current_org_id', true));

CREATE POLICY heartbeatrun_update ON "HeartbeatRun"
FOR UPDATE TO app_user
USING ("organizationId" = current_setting('app.current_org_id', true))
WITH CHECK ("organizationId" = current_setting('app.current_org_id', true));

CREATE POLICY heartbeatrun_delete ON "HeartbeatRun"
FOR DELETE TO app_user
USING ("organizationId" = current_setting('app.current_org_id', true));

-- =========================================================================
-- Rollback (uncomment to revert)
-- =========================================================================
-- DROP POLICY IF EXISTS heartbeatrun_select ON "HeartbeatRun";
-- DROP POLICY IF EXISTS heartbeatrun_insert ON "HeartbeatRun";
-- DROP POLICY IF EXISTS heartbeatrun_update ON "HeartbeatRun";
-- DROP POLICY IF EXISTS heartbeatrun_delete ON "HeartbeatRun";
-- ALTER TABLE "HeartbeatRun" DISABLE ROW LEVEL SECURITY;
-- DROP INDEX IF EXISTS "HeartbeatRun_organizationId_id_idx";
34 changes: 20 additions & 14 deletions src/app/api/agents/[agentId]/heartbeat/runs/route.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { NextRequest, NextResponse } from "next/server";
import { requireAgentOwner, isAuthError } from "@/lib/api/auth-guard";
import { prisma } from "@/lib/prisma";
import { withOrgContext } from "@/lib/db/rls-middleware";
import { logger } from "@/lib/logger";

interface RouteParams {
Expand All @@ -21,20 +22,25 @@ export async function GET(
const cursor = request.nextUrl.searchParams.get("cursor") ?? undefined;

try {
const runs = await prisma.heartbeatRun.findMany({
where: { agentId },
orderBy: { startedAt: "desc" },
take: PAGE_SIZE + 1,
...(cursor && { cursor: { id: cursor }, skip: 1 }),
select: {
id: true,
status: true,
startedAt: true,
completedAt: true,
durationMs: true,
error: true,
},
});
const agent = await prisma.agent.findUnique({ where: { id: agentId }, select: { organizationId: true } });
const orgId = agent?.organizationId ?? null;

const runs = await withOrgContext(prisma, orgId, (tx) =>
tx.heartbeatRun.findMany({
where: { agentId },
orderBy: { startedAt: "desc" },
take: PAGE_SIZE + 1,
...(cursor && { cursor: { id: cursor }, skip: 1 }),
select: {
id: true,
status: true,
startedAt: true,
completedAt: true,
durationMs: true,
error: true,
},
})
);

const hasMore = runs.length > PAGE_SIZE;
const page = hasMore ? runs.slice(0, PAGE_SIZE) : runs;
Expand Down
52 changes: 30 additions & 22 deletions src/lib/heartbeat/heartbeat-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ export async function processHeartbeatRunJob(job: Job<HeartbeatJobData>): Promis
const { executeFlow } = await import("@/lib/runtime/engine");
const { processContextForRun } = await import("@/lib/heartbeat/context-manager");

const run = await prisma.heartbeatRun.create({
data: { agentId, configId, organizationId, status: "RUNNING", startedAt: new Date() },
select: { id: true },
});
const run = await withOrgContext(prisma, organizationId, (tx) =>
tx.heartbeatRun.create({
data: { agentId, configId, organizationId, status: "RUNNING", startedAt: new Date() },
select: { id: true },
})
);

await job.updateProgress(10);

Expand All @@ -35,10 +37,12 @@ export async function processHeartbeatRunJob(job: Job<HeartbeatJobData>): Promis
);

if (!config) {
await prisma.heartbeatRun.update({
where: { id: run.id },
data: { status: "FAILED", completedAt: new Date(), error: "HeartbeatConfig not found" },
});
await withOrgContext(prisma, organizationId, (tx) =>
tx.heartbeatRun.update({
where: { id: run.id },
data: { status: "FAILED", completedAt: new Date(), error: "HeartbeatConfig not found" },
})
);
throw new Error(`HeartbeatConfig ${configId} not found`);
}

Expand Down Expand Up @@ -79,16 +83,18 @@ export async function processHeartbeatRunJob(job: Job<HeartbeatJobData>): Promis
saveContext(context),
]);

await prisma.heartbeatRun.update({
where: { id: run.id },
data: {
status: "COMPLETED",
completedAt: new Date(),
durationMs,
contextSnapshot: contextSnapshot as Prisma.InputJsonValue,
output: { messages: outputMessages } as Prisma.InputJsonValue,
},
});
await withOrgContext(prisma, organizationId, (tx) =>
tx.heartbeatRun.update({
where: { id: run.id },
data: {
status: "COMPLETED",
completedAt: new Date(),
durationMs,
contextSnapshot: contextSnapshot as Prisma.InputJsonValue,
output: { messages: outputMessages } as Prisma.InputJsonValue,
},
})
);

await job.updateProgress(100);

Expand All @@ -99,10 +105,12 @@ export async function processHeartbeatRunJob(job: Job<HeartbeatJobData>): Promis
const error = err instanceof Error ? err.message : String(err);
const durationMs = Date.now() - startTime;

await prisma.heartbeatRun.update({
where: { id: run.id },
data: { status: "FAILED", completedAt: new Date(), durationMs, error },
}).catch(() => {/* non-critical */});
await withOrgContext(prisma, organizationId, (tx) =>
tx.heartbeatRun.update({
where: { id: run.id },
data: { status: "FAILED", completedAt: new Date(), durationMs, error },
})
).catch(() => {/* non-critical */});

logger.error("Heartbeat run failed", { jobId: job.id, agentId, configId, error: err });
throw err;
Expand Down
Loading