Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-- RLS Phase 1 #7 — HeartbeatConfig (TENANT_DIRECT)
-- BullMQ worker reads this via organizationId from job.data; ALS is empty in workers.

-- 1. Composite index for RLS performance (CRITICAL)
CREATE INDEX IF NOT EXISTS "HeartbeatConfig_organizationId_id_idx"
ON "HeartbeatConfig" ("organizationId", "id");

-- 2. Enable RLS with FORCE (so table owner also obeys policies)
ALTER TABLE "HeartbeatConfig" ENABLE ROW LEVEL SECURITY;
ALTER TABLE "HeartbeatConfig" FORCE ROW LEVEL SECURITY;

-- 3. Grants
GRANT SELECT, INSERT, UPDATE, DELETE ON "HeartbeatConfig" TO app_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON "HeartbeatConfig" TO admin_user;

-- 4. Policies (admin_user has BYPASSRLS — no policies needed for it)
CREATE POLICY heartbeatconfig_select ON "HeartbeatConfig"
FOR SELECT TO app_user
USING ("organizationId" = current_setting('app.current_org_id', true));

CREATE POLICY heartbeatconfig_insert ON "HeartbeatConfig"
FOR INSERT TO app_user
WITH CHECK ("organizationId" = current_setting('app.current_org_id', true));

CREATE POLICY heartbeatconfig_update ON "HeartbeatConfig"
FOR UPDATE TO app_user
USING ("organizationId" = current_setting('app.current_org_id', true))
WITH CHECK ("organizationId" = current_setting('app.current_org_id', true));

CREATE POLICY heartbeatconfig_delete ON "HeartbeatConfig"
FOR DELETE TO app_user
USING ("organizationId" = current_setting('app.current_org_id', true));

-- =========================================================================
-- Rollback (uncomment to revert)
-- =========================================================================
-- DROP POLICY IF EXISTS heartbeatconfig_select ON "HeartbeatConfig";
-- DROP POLICY IF EXISTS heartbeatconfig_insert ON "HeartbeatConfig";
-- DROP POLICY IF EXISTS heartbeatconfig_update ON "HeartbeatConfig";
-- DROP POLICY IF EXISTS heartbeatconfig_delete ON "HeartbeatConfig";
-- ALTER TABLE "HeartbeatConfig" DISABLE ROW LEVEL SECURITY;
-- DROP INDEX IF EXISTS "HeartbeatConfig_organizationId_id_idx";
51 changes: 32 additions & 19 deletions src/app/api/agents/[agentId]/heartbeat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { z } from "zod";
import { requireAgentOwner, isAuthError } from "@/lib/api/auth-guard";
import { parseBodyWithLimit } from "@/lib/api/body-limit";
import { prisma } from "@/lib/prisma";
import { withOrgContext } from "@/lib/db/rls-middleware";
import { logger } from "@/lib/logger";
import { validateCronExpression, validateTimezone } from "@/lib/scheduler/cron-validator";
import { scheduleHeartbeat, unscheduleHeartbeat } from "@/lib/heartbeat/heartbeat-scheduler";
Expand Down Expand Up @@ -30,10 +31,15 @@ export async function GET(
if (isAuthError(authResult)) return authResult;

try {
const config = await prisma.heartbeatConfig.findUnique({
where: { agentId },
include: { _count: { select: { runs: true } } },
});
const agent = await prisma.agent.findUnique({ where: { id: agentId }, select: { organizationId: true } });
const orgId = agent?.organizationId ?? null;

const config = await withOrgContext(prisma, orgId, (tx) =>
tx.heartbeatConfig.findUnique({
where: { agentId },
include: { _count: { select: { runs: true } } },
})
);

return NextResponse.json({ success: true, data: config });
} catch (error) {
Expand Down Expand Up @@ -76,19 +82,21 @@ export async function POST(
}

try {
const config = await prisma.heartbeatConfig.upsert({
where: { agentId },
update: {
cronExpression,
timezone,
...(systemPrompt !== undefined && { systemPrompt }),
...(maxContextItems !== undefined && { maxContextItems }),
enabled,
},
create: { agentId, organizationId, cronExpression, timezone, systemPrompt, maxContextItems: maxContextItems ?? 50, enabled },
});

await scheduleHeartbeat(config.id);
const config = await withOrgContext(prisma, organizationId, (tx) =>
tx.heartbeatConfig.upsert({
where: { agentId },
update: {
cronExpression,
timezone,
...(systemPrompt !== undefined && { systemPrompt }),
...(maxContextItems !== undefined && { maxContextItems }),
enabled,
},
create: { agentId, organizationId, cronExpression, timezone, systemPrompt, maxContextItems: maxContextItems ?? 50, enabled },
})
);

await scheduleHeartbeat(config.id, organizationId);

return NextResponse.json({ success: true, data: config }, { status: 201 });
} catch (error) {
Expand All @@ -107,13 +115,18 @@ export async function DELETE(
if (isAuthError(authResult)) return authResult;

try {
const config = await prisma.heartbeatConfig.findUnique({ where: { agentId }, select: { id: true } });
const agent = await prisma.agent.findUnique({ where: { id: agentId }, select: { organizationId: true } });
const orgId = agent?.organizationId ?? null;

const config = await withOrgContext(prisma, orgId, (tx) =>
tx.heartbeatConfig.findUnique({ where: { agentId }, select: { id: true } })
);

if (!config) {
return NextResponse.json({ success: false, error: "Heartbeat config not found" }, { status: 404 });
}

await unscheduleHeartbeat(config.id);
await unscheduleHeartbeat(config.id, orgId);

return NextResponse.json({ success: true, data: null });
} catch (error) {
Expand Down
45 changes: 27 additions & 18 deletions src/lib/heartbeat/heartbeat-scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import { prisma } from "@/lib/prisma";
import { withOrgContext } from "@/lib/db/rls-middleware";
import { logger } from "@/lib/logger";
import { computeNextRunAt } from "@/lib/scheduler/cron-validator";
import type { ScheduleType } from "@/generated/prisma";

export async function scheduleHeartbeat(configId: string): Promise<void> {
const config = await prisma.heartbeatConfig.findUnique({
where: { id: configId },
select: { id: true, agentId: true, cronExpression: true, timezone: true, enabled: true, flowScheduleId: true },
});
export async function scheduleHeartbeat(configId: string, organizationId: string | null): Promise<void> {
const config = await withOrgContext(prisma, organizationId, (tx) =>
tx.heartbeatConfig.findUnique({
where: { id: configId },
select: { id: true, agentId: true, cronExpression: true, timezone: true, enabled: true, flowScheduleId: true },
})
);

if (!config) throw new Error(`HeartbeatConfig ${configId} not found`);

Expand Down Expand Up @@ -35,20 +38,24 @@ export async function scheduleHeartbeat(configId: string): Promise<void> {
},
});

await prisma.heartbeatConfig.update({
where: { id: configId },
data: { flowScheduleId: schedule.id },
});
await withOrgContext(prisma, organizationId, (tx) =>
tx.heartbeatConfig.update({
where: { id: configId },
data: { flowScheduleId: schedule.id },
})
);

logger.info("Heartbeat FlowSchedule created", { configId, flowScheduleId: schedule.id });
}
}

export async function unscheduleHeartbeat(configId: string): Promise<void> {
const config = await prisma.heartbeatConfig.findUnique({
where: { id: configId },
select: { flowScheduleId: true },
});
export async function unscheduleHeartbeat(configId: string, organizationId: string | null): Promise<void> {
const config = await withOrgContext(prisma, organizationId, (tx) =>
tx.heartbeatConfig.findUnique({
where: { id: configId },
select: { flowScheduleId: true },
})
);

if (!config) return;

Expand All @@ -59,10 +66,12 @@ export async function unscheduleHeartbeat(configId: string): Promise<void> {
});
}

await prisma.heartbeatConfig.update({
where: { id: configId },
data: { enabled: false },
});
await withOrgContext(prisma, organizationId, (tx) =>
tx.heartbeatConfig.update({
where: { id: configId },
data: { enabled: false },
})
);

logger.info("Heartbeat unscheduled", { configId });
}
12 changes: 8 additions & 4 deletions src/lib/heartbeat/heartbeat-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Job } from "bullmq";
import { logger } from "@/lib/logger";
import type { Prisma } from "@/generated/prisma";
import { registerSession, removeSession } from "@/lib/session/session-tracker";
import { withOrgContext } from "@/lib/db/rls-middleware";

export interface HeartbeatJobData {
type: "heartbeat.run";
Expand All @@ -25,10 +26,13 @@ export async function processHeartbeatRunJob(job: Job<HeartbeatJobData>): Promis

await job.updateProgress(10);

const config = await prisma.heartbeatConfig.findUnique({
where: { id: configId },
select: { systemPrompt: true },
});
// ALS is empty in BullMQ workers; organizationId must come from job.data explicitly.
const config = await withOrgContext(prisma, organizationId, (tx) =>
tx.heartbeatConfig.findUnique({
where: { id: configId },
select: { systemPrompt: true },
})
);

if (!config) {
await prisma.heartbeatRun.update({
Expand Down
46 changes: 25 additions & 21 deletions src/lib/templates/template-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,20 @@ export async function exportTemplate(
agentId: string,
organizationId: string,
): Promise<{ payload: TemplatePayload; checksum: string }> {
const agent = await prisma.agent.findUnique({
where: { id: agentId },
include: {
flow: true,
mcpServers: { include: { mcpServer: true } },
heartbeatConfig: true,
goalLinks: {
where: { goal: { status: "ACTIVE" } },
include: { goal: true },
const agent = await withOrgContext(prisma, organizationId, (tx) =>
tx.agent.findUnique({
where: { id: agentId },
include: {
flow: true,
mcpServers: { include: { mcpServer: true } },
heartbeatConfig: true,
goalLinks: {
where: { goal: { status: "ACTIVE" } },
include: { goal: true },
},
},
},
});
})
);

if (!agent) throw new Error(`Agent ${agentId} not found`);

Expand Down Expand Up @@ -265,16 +267,18 @@ export async function importTemplate(
}

if (payload.heartbeatConfig) {
await prisma.heartbeatConfig.create({
data: {
agentId: agent.id,
organizationId,
cronExpression: payload.heartbeatConfig.cronExpression,
timezone: payload.heartbeatConfig.timezone,
systemPrompt: payload.heartbeatConfig.systemPrompt,
maxContextItems: payload.heartbeatConfig.maxContextItems,
},
});
await withOrgContext(prisma, organizationId, (tx) =>
tx.heartbeatConfig.create({
data: {
agentId: agent.id,
organizationId,
cronExpression: payload.heartbeatConfig!.cronExpression,
timezone: payload.heartbeatConfig!.timezone,
systemPrompt: payload.heartbeatConfig!.systemPrompt,
maxContextItems: payload.heartbeatConfig!.maxContextItems,
},
})
);
}

for (const goal of payload.goals) {
Expand Down
Loading