From 62094b8fda3dc4c82ef9becc7ca96cc67b522973 Mon Sep 17 00:00:00 2001 From: electronicBlacksmith Date: Sun, 5 Apr 2026 22:07:19 +0000 Subject: [PATCH 1/6] fix(loop): restore Slack feedback for phantom_loop runs Closes #5. The feedback pipeline in LoopRunner already existed but was gated on loop.channelId, which was always null because the agent never plumbed channel_id/conversation_id into the in-process MCP tool call, that context only lived in the router. - AsyncLocalStorage captures the Slack channel/thread/ trigger-message for the current turn so phantom_loop can auto-fill them when the agent omits them. Explicit tool args still win. - Reaction ladder on the operator's original message: hourglass -> cycle -> terminal (check/stop/warning/x). Restart-safe via iteration === 1 check, no in-memory flag. - Inline unicode progress bar in the edited status message. - New trigger_message_ts column on loops, appended as migration #11. - Extracted LoopNotifier into src/loop/notifications.ts, runner.ts was already at the 300-line cap. 34 new tests, 938 pass / 0 fail. --- src/agent/__tests__/slack-context.test.ts | 78 ++++++ src/agent/slack-context.ts | 22 ++ src/db/__tests__/migrate.test.ts | 4 +- src/db/schema.ts | 6 + src/index.ts | 57 ++-- src/loop/__tests__/notifications.test.ts | 310 ++++++++++++++++++++++ src/loop/__tests__/runner.test.ts | 24 ++ src/loop/__tests__/tool.test.ts | 68 +++++ src/loop/notifications.ts | 148 +++++++++++ src/loop/runner.ts | 71 +---- src/loop/store.ts | 6 +- src/loop/tool.ts | 11 +- src/loop/types.ts | 5 + 13 files changed, 716 insertions(+), 94 deletions(-) create mode 100644 src/agent/__tests__/slack-context.test.ts create mode 100644 src/agent/slack-context.ts create mode 100644 src/loop/__tests__/notifications.test.ts create mode 100644 src/loop/notifications.ts diff --git a/src/agent/__tests__/slack-context.test.ts b/src/agent/__tests__/slack-context.test.ts new file mode 100644 index 0000000..550223a --- /dev/null +++ b/src/agent/__tests__/slack-context.test.ts @@ -0,0 +1,78 @@ +import { describe, expect, test } from "bun:test"; +import { type SlackContext, slackContextStore } from "../slack-context.ts"; + +const SAMPLE: SlackContext = { + slackChannelId: "C123", + slackThreadTs: "1700000000.000100", + slackMessageTs: "1700000000.000200", +}; + +describe("slackContextStore", () => { + test("getStore() is undefined outside a run()", () => { + expect(slackContextStore.getStore()).toBeUndefined(); + }); + + test("synchronous read inside run() sees the context", () => { + const seen = slackContextStore.run(SAMPLE, () => slackContextStore.getStore()); + expect(seen).toEqual(SAMPLE); + }); + + test("context propagates across a plain await boundary", async () => { + const seen = await slackContextStore.run(SAMPLE, async () => { + await Promise.resolve(); + return slackContextStore.getStore(); + }); + expect(seen).toEqual(SAMPLE); + }); + + test("context propagates across a setImmediate hop", async () => { + const seen = await slackContextStore.run(SAMPLE, async () => { + await new Promise((resolve) => setImmediate(resolve)); + return slackContextStore.getStore(); + }); + expect(seen).toEqual(SAMPLE); + }); + + test("context propagates through an async generator for-await loop", async () => { + async function* producer(): AsyncGenerator { + for (let i = 0; i < 3; i++) { + await Promise.resolve(); + yield i; + } + } + + const observations: (SlackContext | undefined)[] = []; + await slackContextStore.run(SAMPLE, async () => { + for await (const _ of producer()) { + observations.push(slackContextStore.getStore()); + } + }); + + expect(observations.length).toBe(3); + for (const seen of observations) { + expect(seen).toEqual(SAMPLE); + } + }); + + test("concurrent run() calls keep contexts isolated", async () => { + const other: SlackContext = { + slackChannelId: "C999", + slackThreadTs: "2700000000.000100", + slackMessageTs: "2700000000.000200", + }; + + const [a, b] = await Promise.all([ + slackContextStore.run(SAMPLE, async () => { + await Promise.resolve(); + return slackContextStore.getStore(); + }), + slackContextStore.run(other, async () => { + await Promise.resolve(); + return slackContextStore.getStore(); + }), + ]); + + expect(a).toEqual(SAMPLE); + expect(b).toEqual(other); + }); +}); diff --git a/src/agent/slack-context.ts b/src/agent/slack-context.ts new file mode 100644 index 0000000..a967315 --- /dev/null +++ b/src/agent/slack-context.ts @@ -0,0 +1,22 @@ +import { AsyncLocalStorage } from "node:async_hooks"; + +/** + * Request-scoped Slack context for the current agent turn. + * + * Populated by the channel router when a Slack-origin message enters the + * runtime, and read by in-process MCP tool handlers that need to target the + * operator's originating message/thread without relying on the agent to + * forward the IDs through tool arguments. This is the minimum-surface + * plumbing that lets tools (e.g. phantom_loop) auto-fill channel/thread when + * the agent omits them. + * + * Non-Slack turns (telegram, email, webhook, cli, scheduled triggers) leave + * the store unset; consumers must treat `getStore()` as possibly undefined. + */ +export type SlackContext = { + slackChannelId: string; + slackThreadTs: string; + slackMessageTs: string; +}; + +export const slackContextStore = new AsyncLocalStorage(); diff --git a/src/db/__tests__/migrate.test.ts b/src/db/__tests__/migrate.test.ts index 349c152..cc06d02 100644 --- a/src/db/__tests__/migrate.test.ts +++ b/src/db/__tests__/migrate.test.ts @@ -36,7 +36,7 @@ describe("runMigrations", () => { runMigrations(db); const migrationCount = db.query("SELECT COUNT(*) as count FROM _migrations").get() as { count: number }; - expect(migrationCount.count).toBe(11); + expect(migrationCount.count).toBe(12); }); test("tracks applied migration indices", () => { @@ -48,6 +48,6 @@ describe("runMigrations", () => { .all() .map((r) => (r as { index_num: number }).index_num); - expect(indices).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + expect(indices).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); }); }); diff --git a/src/db/schema.ts b/src/db/schema.ts index beaf254..4fb3302 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -120,4 +120,10 @@ export const MIGRATIONS: string[] = [ )`, "CREATE INDEX IF NOT EXISTS idx_loops_status ON loops(status)", + + // Track the operator's originating Slack message so the loop runner can + // drive a reaction ladder on it (hourglass → cycle → terminal emoji). + // Appended, never inserted mid-array: existing deployments have already + // applied migrations 0–10, so the new column must land at index 11. + "ALTER TABLE loops ADD COLUMN trigger_message_ts TEXT", ]; diff --git a/src/index.ts b/src/index.ts index 013c7fb..bd9986e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,6 +3,7 @@ import { join, resolve } from "node:path"; import { createInProcessToolServer } from "./agent/in-process-tools.ts"; import { AgentRuntime } from "./agent/runtime.ts"; import type { RuntimeEvent } from "./agent/runtime.ts"; +import { slackContextStore } from "./agent/slack-context.ts"; import { CliChannel } from "./channels/cli.ts"; import { EmailChannel } from "./channels/email.ts"; import { emitFeedback, setFeedbackHandler } from "./channels/feedback.ts"; @@ -426,31 +427,37 @@ async function main(): Promise { telegramChannel.startTyping(telegramChatId); } - const response = await runtime.handleMessage( - msg.channelId, - msg.conversationId, - promptText, - (event: RuntimeEvent) => { - switch (event.type) { - case "init": - console.log(`\n[phantom] Session: ${event.sessionId}`); - break; - case "thinking": - statusReactions?.setThinking(); - break; - case "tool_use": - statusReactions?.setTool(event.tool); - if (progressStream) { - const summary = formatToolActivity(event.tool, event.input); - progressStream.addToolActivity(event.tool, summary); - } - break; - case "error": - statusReactions?.setError(); - break; - } - }, - ); + const onEvent = (event: RuntimeEvent): void => { + switch (event.type) { + case "init": + console.log(`\n[phantom] Session: ${event.sessionId}`); + break; + case "thinking": + statusReactions?.setThinking(); + break; + case "tool_use": + statusReactions?.setTool(event.tool); + if (progressStream) { + const summary = formatToolActivity(event.tool, event.input); + progressStream.addToolActivity(event.tool, summary); + } + break; + case "error": + statusReactions?.setError(); + break; + } + }; + + const runHandle = (): ReturnType => + runtime.handleMessage(msg.channelId, msg.conversationId, promptText, onEvent); + + // Slack-origin turns run inside an AsyncLocalStorage scope so in-process + // MCP tools (phantom_loop, etc.) can auto-target the operator's thread + // and original message without relying on the agent to forward the IDs. + const response = + isSlack && slackChannelId && slackThreadTs && slackMessageTs + ? await slackContextStore.run({ slackChannelId, slackThreadTs, slackMessageTs }, runHandle) + : await runHandle(); // Track assistant messages if (response.text) { diff --git a/src/loop/__tests__/notifications.test.ts b/src/loop/__tests__/notifications.test.ts new file mode 100644 index 0000000..4ba68e8 --- /dev/null +++ b/src/loop/__tests__/notifications.test.ts @@ -0,0 +1,310 @@ +import { Database } from "bun:sqlite"; +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; +import type { SlackChannel } from "../../channels/slack.ts"; +import { runMigrations } from "../../db/migrate.ts"; +import { LoopNotifier, buildProgressBar, terminalEmoji } from "../notifications.ts"; +import { LoopStore } from "../store.ts"; +import type { Loop, LoopStatus } from "../types.ts"; + +// Minimal SlackChannel shape the notifier actually calls. Every method is +// a mock so we can assert call args and ordering. +type MockSlack = { + postToChannel: ReturnType; + updateMessage: ReturnType; + addReaction: ReturnType; + removeReaction: ReturnType; +}; + +function makeSlack(overrides: Partial = {}): MockSlack { + return { + postToChannel: mock(async () => "1700000000.100100"), + updateMessage: mock(async () => undefined), + addReaction: mock(async () => undefined), + removeReaction: mock(async () => undefined), + ...overrides, + }; +} + +function asSlack(m: MockSlack): SlackChannel { + return m as unknown as SlackChannel; +} + +function makeLoop(overrides: Partial = {}): Loop { + return { + id: "abcdef0123456789", + goal: "test goal", + workspaceDir: "/tmp/ws", + stateFile: "/tmp/ws/state.md", + successCommand: null, + maxIterations: 10, + maxCostUsd: 5, + status: "running", + iterationCount: 0, + totalCostUsd: 0, + channelId: "C100", + conversationId: "1700000000.000100", + statusMessageTs: null, + triggerMessageTs: "1700000000.000200", + interruptRequested: false, + lastError: null, + startedAt: "2026-04-05T00:00:00Z", + lastTickAt: null, + finishedAt: null, + ...overrides, + }; +} + +describe("buildProgressBar", () => { + test("renders empty bar at 0/N", () => { + expect(buildProgressBar(0, 10)).toBe("[░░░░░░░░░░]"); + }); + test("renders full bar at N/N", () => { + expect(buildProgressBar(10, 10)).toBe("[██████████]"); + }); + test("renders half bar at N/2", () => { + expect(buildProgressBar(5, 10)).toBe("[█████░░░░░]"); + }); + test("rounds to nearest cell", () => { + // 3/7 ≈ 43% → 4 cells of 10 + expect(buildProgressBar(3, 7)).toBe("[████░░░░░░]"); + }); + test("clamps overflow", () => { + expect(buildProgressBar(99, 10)).toBe("[██████████]"); + }); + test("handles zero total safely", () => { + expect(buildProgressBar(0, 0)).toBe("[░░░░░░░░░░]"); + }); +}); + +describe("terminalEmoji", () => { + test("maps every known status", () => { + expect(terminalEmoji("done")).toBe(":white_check_mark:"); + expect(terminalEmoji("stopped")).toBe(":octagonal_sign:"); + expect(terminalEmoji("budget_exceeded")).toBe(":warning:"); + expect(terminalEmoji("failed")).toBe(":x:"); + expect(terminalEmoji("running")).toBe(":repeat:"); + }); +}); + +describe("LoopNotifier", () => { + let db: Database; + let store: LoopStore; + + beforeEach(() => { + db = new Database(":memory:"); + db.run("PRAGMA journal_mode = WAL"); + runMigrations(db); + store = new LoopStore(db); + }); + + afterEach(() => { + db.close(); + }); + + describe("postStartNotice", () => { + test("no-ops when slackChannel is null", async () => { + const notifier = new LoopNotifier(null, store); + await notifier.postStartNotice(makeLoop()); + // Nothing to assert beyond "did not throw"; the null guard is the + // whole point. + expect(true).toBe(true); + }); + + test("no-ops when loop.channelId is null", async () => { + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postStartNotice(makeLoop({ channelId: null })); + expect(slack.postToChannel).not.toHaveBeenCalled(); + expect(slack.addReaction).not.toHaveBeenCalled(); + }); + + test("posts, persists ts, attaches stop button, stamps start reaction", async () => { + // Insert a real row so setStatusMessageTs can UPDATE it. + const loop = store.insert({ + id: "abcdef0123456789", + goal: "g", + workspaceDir: "/tmp/ws", + stateFile: "/tmp/ws/state.md", + successCommand: null, + maxIterations: 10, + maxCostUsd: 5, + channelId: "C100", + conversationId: "1700000000.000100", + triggerMessageTs: "1700000000.000200", + }); + + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postStartNotice(loop); + + expect(slack.postToChannel).toHaveBeenCalledTimes(1); + const [channel, text, threadTs] = slack.postToChannel.mock.calls[0]; + expect(channel).toBe("C100"); + expect(text).toContain("Starting loop"); + expect(threadTs).toBe("1700000000.000100"); + + // Stop button attached via updateMessage with blocks + expect(slack.updateMessage).toHaveBeenCalledTimes(1); + const updateArgs = slack.updateMessage.mock.calls[0]; + expect(updateArgs[0]).toBe("C100"); + expect(updateArgs[3]).toBeDefined(); // blocks array + const blocks = updateArgs[3] as Array>; + const actionsBlock = blocks.find((b) => b.type === "actions"); + expect(actionsBlock).toBeDefined(); + + // Reaction stamped on the operator's trigger message + expect(slack.addReaction).toHaveBeenCalledWith("C100", "1700000000.000200", "hourglass_flowing_sand"); + + // Persisted status_message_ts round-trips back through findById + const reloaded = store.findById(loop.id); + expect(reloaded?.statusMessageTs).toBe("1700000000.100100"); + }); + + test("skips reaction when triggerMessageTs is null", async () => { + const loop = store.insert({ + id: "abcdef0123456789", + goal: "g", + workspaceDir: "/tmp/ws", + stateFile: "/tmp/ws/state.md", + successCommand: null, + maxIterations: 10, + maxCostUsd: 5, + channelId: "C100", + conversationId: null, + triggerMessageTs: null, + }); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postStartNotice(loop); + expect(slack.postToChannel).toHaveBeenCalled(); + expect(slack.addReaction).not.toHaveBeenCalled(); + }); + }); + + describe("postTickUpdate", () => { + function insertWithStatusTs(overrides: { triggerMessageTs?: string | null; iteration?: number } = {}) { + const row = store.insert({ + id: "abcdef0123456789", + goal: "g", + workspaceDir: "/tmp/ws", + stateFile: "/tmp/ws/state.md", + successCommand: null, + maxIterations: 10, + maxCostUsd: 5, + channelId: "C100", + conversationId: "1700000000.000100", + triggerMessageTs: overrides.triggerMessageTs ?? "1700000000.000200", + }); + store.setStatusMessageTs(row.id, "1700000000.100100"); + if (overrides.iteration) store.recordTick(row.id, overrides.iteration, 0); + const reloaded = store.findById(row.id); + if (!reloaded) throw new Error("failed to reload"); + return reloaded; + } + + test("edits the status message with a progress bar and cost", async () => { + const loop = insertWithStatusTs(); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postTickUpdate(loop.id, 3, "in-progress"); + + expect(slack.updateMessage).toHaveBeenCalledTimes(1); + const [ch, ts, text] = slack.updateMessage.mock.calls[0]; + expect(ch).toBe("C100"); + expect(ts).toBe("1700000000.100100"); + expect(text).toContain("3/10"); + expect(text).toContain("abcdef01"); + expect(text).toMatch(/\[█+░+\]/); + expect(text).toContain("in-progress"); + }); + + test("swaps hourglass → cycle on the first tick", async () => { + insertWithStatusTs(); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postTickUpdate("abcdef0123456789", 1, "in-progress"); + + expect(slack.removeReaction).toHaveBeenCalledWith("C100", "1700000000.000200", "hourglass_flowing_sand"); + expect(slack.addReaction).toHaveBeenCalledWith("C100", "1700000000.000200", "arrows_counterclockwise"); + }); + + test("does not swap reactions on tick 2+", async () => { + insertWithStatusTs(); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postTickUpdate("abcdef0123456789", 2, "in-progress"); + + expect(slack.removeReaction).not.toHaveBeenCalled(); + expect(slack.addReaction).not.toHaveBeenCalled(); + }); + + test("no-ops when statusMessageTs is not yet set", async () => { + store.insert({ + id: "abcdef0123456789", + goal: "g", + workspaceDir: "/tmp/ws", + stateFile: "/tmp/ws/state.md", + successCommand: null, + maxIterations: 10, + maxCostUsd: 5, + channelId: "C100", + conversationId: null, + triggerMessageTs: "1700000000.000200", + }); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postTickUpdate("abcdef0123456789", 1, "in-progress"); + expect(slack.updateMessage).not.toHaveBeenCalled(); + }); + }); + + describe("postFinalNotice", () => { + const cases: Array<{ status: LoopStatus; reaction: string }> = [ + { status: "done", reaction: "white_check_mark" }, + { status: "stopped", reaction: "octagonal_sign" }, + { status: "budget_exceeded", reaction: "warning" }, + { status: "failed", reaction: "x" }, + ]; + + for (const { status, reaction } of cases) { + test(`stamps terminal reaction for status=${status}`, async () => { + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice(makeLoop({ statusMessageTs: "1700000000.100100", status }), status); + const addCalls = slack.addReaction.mock.calls.map((c: unknown[]) => c[2]); + expect(addCalls).toContain(reaction); + // Both in-flight reactions best-effort removed + const removeCalls = slack.removeReaction.mock.calls.map((c: unknown[]) => c[2]); + expect(removeCalls).toContain("hourglass_flowing_sand"); + expect(removeCalls).toContain("arrows_counterclockwise"); + }); + } + + test("edits existing status message when set", async () => { + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice(makeLoop({ statusMessageTs: "1700000000.100100" }), "done"); + expect(slack.updateMessage).toHaveBeenCalledTimes(1); + expect(slack.postToChannel).not.toHaveBeenCalled(); + }); + + test("posts new message when statusMessageTs is null", async () => { + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice(makeLoop({ statusMessageTs: null }), "done"); + expect(slack.postToChannel).toHaveBeenCalledTimes(1); + expect(slack.updateMessage).not.toHaveBeenCalled(); + }); + + test("no-ops when triggerMessageTs is null", async () => { + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice( + makeLoop({ statusMessageTs: "1700000000.100100", triggerMessageTs: null }), + "done", + ); + expect(slack.addReaction).not.toHaveBeenCalled(); + expect(slack.removeReaction).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/src/loop/__tests__/runner.test.ts b/src/loop/__tests__/runner.test.ts index d24a3f8..ee1ffb4 100644 --- a/src/loop/__tests__/runner.test.ts +++ b/src/loop/__tests__/runner.test.ts @@ -97,6 +97,30 @@ describe("LoopRunner", () => { expect(loop.maxCostUsd).toBe(50); }); + test("triggerMessageTs round-trips through start → store → findById", () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ + goal: "with trigger", + channelId: "C100", + conversationId: "1700000000.000100", + triggerMessageTs: "1700000000.000200", + }); + expect(loop.triggerMessageTs).toBe("1700000000.000200"); + + const reloaded = runner.getLoop(loop.id); + expect(reloaded?.triggerMessageTs).toBe("1700000000.000200"); + expect(reloaded?.channelId).toBe("C100"); + expect(reloaded?.conversationId).toBe("1700000000.000100"); + }); + + test("triggerMessageTs is null when omitted at start", () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ goal: "no trigger" }); + expect(loop.triggerMessageTs).toBeNull(); + }); + test("tick invokes runtime with loop channel and rotating conversation ids", async () => { const runtime = createMockRuntime(); const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); diff --git a/src/loop/__tests__/tool.test.ts b/src/loop/__tests__/tool.test.ts index 5315b98..b29958b 100644 --- a/src/loop/__tests__/tool.test.ts +++ b/src/loop/__tests__/tool.test.ts @@ -3,6 +3,7 @@ import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; +import { slackContextStore } from "../../agent/slack-context.ts"; import { runMigrations } from "../../db/migrate.ts"; import { LoopRunner } from "../runner.ts"; import { LOOP_TOOL_NAME, createLoopToolServer } from "../tool.ts"; @@ -108,4 +109,71 @@ describe("phantom_loop MCP tool", () => { const body = parseResult(result); expect(body.count).toBeGreaterThanOrEqual(2); }); + + describe("slackContextStore fallback", () => { + test("start fills channel/thread/trigger from context when args omitted", async () => { + const result = await slackContextStore.run( + { + slackChannelId: "C42", + slackThreadTs: "1700000000.000100", + slackMessageTs: "1700000000.000200", + }, + () => handler({ action: "start", goal: "from context" }), + ); + const { loop } = parseResult(result); + // triggerMessageTs is intentionally not exposed in serializeLoop, + // so read back through the runner directly. + const stored = runner.getLoop(loop.id); + expect(stored?.channelId).toBe("C42"); + expect(stored?.conversationId).toBe("1700000000.000100"); + expect(stored?.triggerMessageTs).toBe("1700000000.000200"); + }); + + test("explicit args override context", async () => { + const result = await slackContextStore.run( + { + slackChannelId: "C_CTX", + slackThreadTs: "1700000000.000100", + slackMessageTs: "1700000000.000200", + }, + () => + handler({ + action: "start", + goal: "explicit wins", + channel_id: "C_EXPLICIT", + conversation_id: "1800000000.000100", + trigger_message_ts: "1800000000.000200", + }), + ); + const { loop } = parseResult(result); + const stored = runner.getLoop(loop.id); + expect(stored?.channelId).toBe("C_EXPLICIT"); + expect(stored?.conversationId).toBe("1800000000.000100"); + expect(stored?.triggerMessageTs).toBe("1800000000.000200"); + }); + + test("missing context leaves fields null without crashing", async () => { + // No slackContextStore.run wrapper here. + const result = await handler({ action: "start", goal: "no context" }); + const { loop } = parseResult(result); + const stored = runner.getLoop(loop.id); + expect(stored?.channelId).toBeNull(); + expect(stored?.conversationId).toBeNull(); + expect(stored?.triggerMessageTs).toBeNull(); + }); + + test("serializeLoop does not expose triggerMessageTs to the agent", async () => { + const result = await slackContextStore.run( + { + slackChannelId: "C42", + slackThreadTs: "1700000000.000100", + slackMessageTs: "1700000000.000200", + }, + () => handler({ action: "start", goal: "hidden field" }), + ); + const body = parseResult(result); + expect(body.loop.trigger_message_ts).toBeUndefined(); + expect(body.loop.triggerMessageTs).toBeUndefined(); + }); + }); }); diff --git a/src/loop/notifications.ts b/src/loop/notifications.ts new file mode 100644 index 0000000..8fdbbea --- /dev/null +++ b/src/loop/notifications.ts @@ -0,0 +1,148 @@ +import type { SlackBlock } from "../channels/feedback.ts"; +import type { SlackChannel } from "../channels/slack.ts"; +import type { LoopStore } from "./store.ts"; +import type { Loop, LoopStatus } from "./types.ts"; + +const PROGRESS_BAR_CELLS = 10; + +// Single source of truth for status → emoji. Bare names (no colons) because +// the Slack reactions.add/remove APIs take bare names; the status-message +// text wraps them with colons via `terminalEmoji()`. Keeping both formats +// derived from one map eliminates the silent drift risk when a new terminal +// status is added. +const TERMINAL_REACTION: Partial> = { + done: "white_check_mark", + stopped: "octagonal_sign", + budget_exceeded: "warning", + failed: "x", +}; + +const REACTION_START = "hourglass_flowing_sand"; +const REACTION_IN_FLIGHT = "arrows_counterclockwise"; + +const IN_FLIGHT_REACTIONS = [REACTION_START, REACTION_IN_FLIGHT] as const; + +function terminalReaction(status: LoopStatus): string | null { + return TERMINAL_REACTION[status] ?? null; +} + +export function buildProgressBar(done: number, total: number): string { + if (total <= 0) return `[${"░".repeat(PROGRESS_BAR_CELLS)}]`; + const clamped = Math.max(0, Math.min(done, total)); + const filled = Math.round((clamped / total) * PROGRESS_BAR_CELLS); + const empty = PROGRESS_BAR_CELLS - filled; + return `[${"█".repeat(filled)}${"░".repeat(empty)}]`; +} + +export function terminalEmoji(status: LoopStatus): string { + const reaction = TERMINAL_REACTION[status]; + if (reaction) return `:${reaction}:`; + // Non-terminal statuses still need a glyph for the running-state text. + return status === "running" ? ":repeat:" : ":grey_question:"; +} + +function truncate(text: string, max: number): string { + return text.length <= max ? text : `${text.slice(0, max - 1)}…`; +} + +/** + * Slack feedback for the loop lifecycle: start notice, per-tick progress + * edit, final notice, and a reaction ladder on the operator's original + * message (hourglass → cycle → terminal emoji). + * + * Extracted from LoopRunner because runner.ts was already at the 300-line + * CONTRIBUTING.md cap and the progress-bar + reaction-ladder additions push + * it over. All Slack-API failures are swallowed upstream in SlackChannel; + * if a call-site here still throws, we catch and warn so loop execution is + * never derailed by chat plumbing. + * + * Why not reuse createStatusReactionController: that controller debounces + * per-tool-call runtime events via a promise-chain serializer. The loop + * ladder has exactly three sequential lifecycle states (start, first tick, + * terminal), no debouncing is required, and wiring it into the controller + * would entangle two unrelated lifecycles. Plain best-effort + * addReaction/removeReaction is the right choice here. + */ +export class LoopNotifier { + constructor( + private slackChannel: SlackChannel | null, + private store: LoopStore, + ) {} + + async postStartNotice(loop: Loop): Promise { + if (!this.slackChannel || !loop.channelId) return; + const text = `:repeat: Starting loop \`${loop.id.slice(0, 8)}\` (max ${loop.maxIterations} iter, $${loop.maxCostUsd.toFixed(2)} budget)\n> ${truncate(loop.goal, 200)}`; + // When conversationId (a Slack thread ts) is set, thread the updates into it; + // otherwise post a top-level message in the channel. + const ts = await this.slackChannel.postToChannel(loop.channelId, text, loop.conversationId ?? undefined); + if (!ts) return; + this.store.setStatusMessageTs(loop.id, ts); + + // Attach a stop button so the operator can interrupt without using MCP. + // Routed via setLoopStopHandler in slack-actions.ts. + const blocks: SlackBlock[] = [ + { type: "section", text: { type: "mrkdwn", text } }, + { + type: "actions", + block_id: `phantom_loop_actions_${loop.id}`, + elements: [ + { + type: "button", + text: { type: "plain_text", text: "Stop loop", emoji: true }, + action_id: `phantom:loop_stop:${loop.id}`, + style: "danger", + value: loop.id, + }, + ], + }, + ]; + await this.slackChannel.updateMessage(loop.channelId, ts, text, blocks); + + if (loop.triggerMessageTs) { + await this.slackChannel.addReaction(loop.channelId, loop.triggerMessageTs, REACTION_START); + } + } + + async postTickUpdate(id: string, iteration: number, status: string): Promise { + const loop = this.store.findById(id); + if (!loop || !this.slackChannel || !loop.channelId || !loop.statusMessageTs) return; + + const bar = buildProgressBar(iteration, loop.maxIterations); + const shortId = loop.id.slice(0, 8); + const text = `:repeat: Loop \`${shortId}\` · ${bar} ${iteration}/${loop.maxIterations} · $${loop.totalCostUsd.toFixed(2)}/$${loop.maxCostUsd.toFixed(2)} · ${status}`; + await this.slackChannel.updateMessage(loop.channelId, loop.statusMessageTs, text); + + // On the first tick, swap hourglass → cycling arrows. Restart-safe by + // construction: iteration is sourced from the call site, so on resume + // the swap only fires if the loop is actually transitioning through + // iteration 1, no in-memory flag to repopulate. + if (iteration === 1 && loop.triggerMessageTs) { + await this.slackChannel.removeReaction(loop.channelId, loop.triggerMessageTs, REACTION_START); + await this.slackChannel.addReaction(loop.channelId, loop.triggerMessageTs, REACTION_IN_FLIGHT); + } + } + + async postFinalNotice(loop: Loop, status: LoopStatus): Promise { + if (!this.slackChannel || !loop.channelId) return; + const emoji = terminalEmoji(status); + const text = `${emoji} Loop \`${loop.id.slice(0, 8)}\` finished (${status}) after ${loop.iterationCount} iterations, $${loop.totalCostUsd.toFixed(4)} spent`; + if (loop.statusMessageTs) { + await this.slackChannel.updateMessage(loop.channelId, loop.statusMessageTs, text); + } else { + await this.slackChannel.postToChannel(loop.channelId, text); + } + + if (loop.triggerMessageTs) { + // Best-effort: clear whichever in-flight reaction is currently on + // the message (removeReaction is idempotent on missing), then stamp + // the terminal one. + for (const reaction of IN_FLIGHT_REACTIONS) { + await this.slackChannel.removeReaction(loop.channelId, loop.triggerMessageTs, reaction); + } + const terminal = terminalReaction(status); + if (terminal) { + await this.slackChannel.addReaction(loop.channelId, loop.triggerMessageTs, terminal); + } + } + } +} diff --git a/src/loop/runner.ts b/src/loop/runner.ts index 94b876b..4c61de6 100644 --- a/src/loop/runner.ts +++ b/src/loop/runner.ts @@ -4,6 +4,7 @@ import { join, relative, resolve } from "node:path"; import type { AgentRuntime } from "../agent/runtime.ts"; import type { SlackChannel } from "../channels/slack.ts"; import { buildSafeEnv } from "../mcp/dynamic-handlers.ts"; +import { LoopNotifier } from "./notifications.ts"; import { buildTickPrompt } from "./prompt.ts"; import { initStateFile, parseFrontmatter, readStateFile } from "./state-file.ts"; import { LoopStore } from "./store.ts"; @@ -61,6 +62,7 @@ export class LoopRunner { private dataDir: string; private autoSchedule: boolean; private inFlight = new Set(); + private notifier: LoopNotifier; constructor(deps: RunnerDeps) { this.store = new LoopStore(deps.db); @@ -68,10 +70,12 @@ export class LoopRunner { this.slackChannel = deps.slackChannel; this.dataDir = deps.dataDir ?? resolve(process.cwd(), "data"); this.autoSchedule = deps.autoSchedule ?? true; + this.notifier = new LoopNotifier(this.slackChannel ?? null, this.store); } setSlackChannel(channel: SlackChannel): void { this.slackChannel = channel; + this.notifier = new LoopNotifier(channel, this.store); } /** @@ -111,9 +115,10 @@ export class LoopRunner { maxCostUsd, channelId: input.channelId ?? null, conversationId: input.conversationId ?? null, + triggerMessageTs: input.triggerMessageTs ?? null, }); - this.postStartNotice(loop).catch((err: unknown) => { + this.notifier.postStartNotice(loop).catch((err: unknown) => { const msg = err instanceof Error ? err.message : String(err); console.warn(`[loop] Failed to post start notice for ${id}: ${msg}`); }); @@ -200,7 +205,7 @@ export class LoopRunner { } } - this.postTickUpdate(id, nextIteration, frontmatter?.status ?? "in-progress").catch((err: unknown) => { + this.notifier.postTickUpdate(id, nextIteration, frontmatter?.status ?? "in-progress").catch((err: unknown) => { const msg = err instanceof Error ? err.message : String(err); console.warn(`[loop] Failed to post tick update for ${id}: ${msg}`); }); @@ -231,73 +236,13 @@ export class LoopRunner { private finalize(id: string, status: LoopStatus, error: string | null): void { const loop = this.store.finalize(id, status, error); if (!loop) return; - this.postFinalNotice(loop, status).catch((err: unknown) => { + this.notifier.postFinalNotice(loop, status).catch((err: unknown) => { const msg = err instanceof Error ? err.message : String(err); console.warn(`[loop] Failed to post final notice for ${id}: ${msg}`); }); } - - private async postStartNotice(loop: Loop): Promise { - if (!this.slackChannel || !loop.channelId) return; - const text = `:repeat: Starting loop \`${loop.id.slice(0, 8)}\` (max ${loop.maxIterations} iter, $${loop.maxCostUsd.toFixed(2)} budget)\n> ${truncate(loop.goal, 200)}`; - // When conversationId (a Slack thread ts) is set, thread the updates into it; - // otherwise post a top-level message in the channel. - const ts = await this.slackChannel.postToChannel(loop.channelId, text, loop.conversationId ?? undefined); - if (!ts) return; - this.store.setStatusMessageTs(loop.id, ts); - - // Attach a stop button so the operator can interrupt without using MCP. - // Routed via setLoopStopHandler in slack-actions.ts. - const blocks = [ - { type: "section", text: { type: "mrkdwn", text } }, - { - type: "actions", - block_id: `phantom_loop_actions_${loop.id}`, - elements: [ - { - type: "button", - text: { type: "plain_text", text: "Stop loop", emoji: true }, - action_id: `phantom:loop_stop:${loop.id}`, - style: "danger", - value: loop.id, - }, - ], - }, - ]; - await this.slackChannel.updateMessage(loop.channelId, ts, text, blocks); - } - - private async postTickUpdate(id: string, iteration: number, status: string): Promise { - const loop = this.store.findById(id); - if (!loop || !this.slackChannel || !loop.channelId || !loop.statusMessageTs) return; - const text = `:repeat: Loop \`${loop.id.slice(0, 8)}\` iteration ${iteration}/${loop.maxIterations} - ${status} ($${loop.totalCostUsd.toFixed(4)} of $${loop.maxCostUsd.toFixed(2)})`; - await this.slackChannel.updateMessage(loop.channelId, loop.statusMessageTs, text); - } - - private async postFinalNotice(loop: Loop, status: LoopStatus): Promise { - if (!this.slackChannel || !loop.channelId) return; - const emoji = FINAL_EMOJI[status] ?? ":grey_question:"; - const text = `${emoji} Loop \`${loop.id.slice(0, 8)}\` finished (${status}) after ${loop.iterationCount} iterations, $${loop.totalCostUsd.toFixed(4)} spent`; - if (loop.statusMessageTs) { - await this.slackChannel.updateMessage(loop.channelId, loop.statusMessageTs, text); - } else { - await this.slackChannel.postToChannel(loop.channelId, text); - } - } } -const FINAL_EMOJI: Record = { - running: ":repeat:", - done: ":white_check_mark:", - stopped: ":octagonal_sign:", - budget_exceeded: ":warning:", - failed: ":x:", -}; - function clamp(value: number, min: number, max: number): number { return Math.min(Math.max(value, min), max); } - -function truncate(text: string, max: number): string { - return text.length <= max ? text : `${text.slice(0, max - 1)}…`; -} diff --git a/src/loop/store.ts b/src/loop/store.ts index 930265e..9f53000 100644 --- a/src/loop/store.ts +++ b/src/loop/store.ts @@ -11,6 +11,7 @@ export type LoopInsertInput = { maxCostUsd: number; channelId: string | null; conversationId: string | null; + triggerMessageTs: string | null; }; /** @@ -22,8 +23,8 @@ export class LoopStore { insert(input: LoopInsertInput): Loop { this.db.run( - `INSERT INTO loops (id, goal, workspace_dir, state_file, success_command, max_iterations, max_cost_usd, status, channel_id, conversation_id) - VALUES (?, ?, ?, ?, ?, ?, ?, 'running', ?, ?)`, + `INSERT INTO loops (id, goal, workspace_dir, state_file, success_command, max_iterations, max_cost_usd, status, channel_id, conversation_id, trigger_message_ts) + VALUES (?, ?, ?, ?, ?, ?, ?, 'running', ?, ?, ?)`, [ input.id, input.goal, @@ -34,6 +35,7 @@ export class LoopStore { input.maxCostUsd, input.channelId, input.conversationId, + input.triggerMessageTs, ], ); const created = this.findById(input.id); diff --git a/src/loop/tool.ts b/src/loop/tool.ts index 0feb167..9c86437 100644 --- a/src/loop/tool.ts +++ b/src/loop/tool.ts @@ -1,6 +1,7 @@ import { createSdkMcpServer, tool } from "@anthropic-ai/claude-agent-sdk"; import type { McpSdkServerConfigWithInstance } from "@anthropic-ai/claude-agent-sdk"; import { z } from "zod"; +import { slackContextStore } from "../agent/slack-context.ts"; import type { LoopRunner } from "./runner.ts"; import { parseFrontmatter, readStateFile } from "./state-file.ts"; import type { Loop } from "./types.ts"; @@ -74,6 +75,7 @@ regression". Each iteration is fresh - all context must live in the state file.` success_command: z.string().optional(), channel_id: z.string().optional(), conversation_id: z.string().optional(), + trigger_message_ts: z.string().optional(), loop_id: z.string().optional().describe("Loop ID (required for status and stop)"), include_finished: z.boolean().optional().describe("For list: include terminated loops"), }, @@ -82,14 +84,19 @@ regression". Each iteration is fresh - all context must live in the state file.` switch (input.action) { case "start": { if (!input.goal) return err("goal is required for start"); + // Explicit tool arguments always win. When the agent omits + // channel/thread plumbing, fall back to the Slack context + // captured by the router for the current turn. + const ctx = slackContextStore.getStore(); const loop = runner.start({ goal: input.goal, workspace: input.workspace, maxIterations: input.max_iterations, maxCostUsd: input.max_cost_usd, successCommand: input.success_command, - channelId: input.channel_id, - conversationId: input.conversation_id, + channelId: input.channel_id ?? ctx?.slackChannelId, + conversationId: input.conversation_id ?? ctx?.slackThreadTs, + triggerMessageTs: input.trigger_message_ts ?? ctx?.slackMessageTs, }); return ok({ started: true, loop: serializeLoop(loop) }); } diff --git a/src/loop/types.ts b/src/loop/types.ts index cdb5abc..d30249e 100644 --- a/src/loop/types.ts +++ b/src/loop/types.ts @@ -16,6 +16,7 @@ export type Loop = { channelId: string | null; conversationId: string | null; statusMessageTs: string | null; + triggerMessageTs: string | null; interruptRequested: boolean; lastError: string | null; startedAt: string; @@ -37,6 +38,7 @@ export type LoopRow = { channel_id: string | null; conversation_id: string | null; status_message_ts: string | null; + trigger_message_ts: string | null; interrupt_requested: number; last_error: string | null; started_at: string; @@ -58,6 +60,7 @@ export type LoopStartInput = { successCommand?: string; channelId?: string; conversationId?: string; + triggerMessageTs?: string; }; // Hard ceilings the agent cannot raise. Caller-provided values are clamped. @@ -74,6 +77,7 @@ export const LoopStartInputSchema = z.object({ success_command: z.string().optional(), channel_id: z.string().optional(), conversation_id: z.string().optional(), + trigger_message_ts: z.string().optional(), }); export const LoopIdSchema = z.object({ loop_id: z.string().min(1) }); @@ -95,6 +99,7 @@ export function rowToLoop(row: LoopRow): Loop { channelId: row.channel_id, conversationId: row.conversation_id, statusMessageTs: row.status_message_ts, + triggerMessageTs: row.trigger_message_ts, interruptRequested: row.interrupt_requested === 1, lastError: row.last_error, startedAt: row.started_at, From 65ec5de30da0d746c23f7a338a6b49f93c4a0225 Mon Sep 17 00:00:00 2001 From: electronicBlacksmith Date: Sun, 5 Apr 2026 22:57:49 +0000 Subject: [PATCH 2/6] fix(loop): persist Stop button and surface state.md summary on completion Two defects surfaced during the first Slack end-to-end test of the loop feedback fix: 1. Stop button disappeared after the first tick. Slack's chat.update replaces the message wholesale and strips any blocks the caller does not include. postStartNotice attached the button but postTickUpdate called updateMessage without blocks, so the button was wiped on the first progress edit. Extract buildStatusBlocks() and re-send it on every tick edit. Final notice still omits blocks intentionally so the button disappears when the loop is no longer interruptible. 2. No end-of-loop summary. The agent curates the state.md body every tick (Goal, Progress, Next Action, Notes), but that content never reached the operator. Post it as a threaded reply when the loop finalizes. No extra agent cost: we surface content the agent already wrote. Frontmatter stripped, truncated at 3500 chars, silently skipped if the file is missing or empty. +7 tests covering both regressions. 945 pass / 0 fail. --- src/loop/__tests__/notifications.test.ts | 125 +++++++++++++++++++++++ src/loop/notifications.ts | 91 +++++++++++++---- 2 files changed, 197 insertions(+), 19 deletions(-) diff --git a/src/loop/__tests__/notifications.test.ts b/src/loop/__tests__/notifications.test.ts index 4ba68e8..1443125 100644 --- a/src/loop/__tests__/notifications.test.ts +++ b/src/loop/__tests__/notifications.test.ts @@ -1,5 +1,8 @@ import { Database } from "bun:sqlite"; import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; +import { mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; import type { SlackChannel } from "../../channels/slack.ts"; import { runMigrations } from "../../db/migrate.ts"; import { LoopNotifier, buildProgressBar, terminalEmoji } from "../notifications.ts"; @@ -218,6 +221,26 @@ describe("LoopNotifier", () => { expect(text).toContain("in-progress"); }); + test("re-sends blocks on every tick edit so the Stop button persists", async () => { + // Regression test: Slack's chat.update replaces the entire message + // and drops blocks the caller does not include. Without passing + // blocks on tick updates, the Stop button would disappear after + // the first tick edit. Verify the button survives. + insertWithStatusTs(); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postTickUpdate("abcdef0123456789", 2, "in-progress"); + + expect(slack.updateMessage).toHaveBeenCalledTimes(1); + const updateArgs = slack.updateMessage.mock.calls[0]; + const blocks = updateArgs[3] as Array> | undefined; + expect(blocks).toBeDefined(); + const actionsBlock = blocks?.find((b) => b.type === "actions"); + expect(actionsBlock).toBeDefined(); + const elements = (actionsBlock as { elements: Array> }).elements; + expect(elements[0].action_id).toBe("phantom:loop_stop:abcdef0123456789"); + }); + test("swaps hourglass → cycle on the first tick", async () => { insertWithStatusTs(); const slack = makeSlack(); @@ -306,5 +329,107 @@ describe("LoopNotifier", () => { expect(slack.addReaction).not.toHaveBeenCalled(); expect(slack.removeReaction).not.toHaveBeenCalled(); }); + + describe("state summary thread reply", () => { + let workDir: string; + + beforeEach(() => { + workDir = mkdtempSync(join(tmpdir(), "loop-notifier-summary-")); + }); + + afterEach(() => { + rmSync(workDir, { recursive: true, force: true }); + }); + + function writeStateFile(body: string): string { + const stateFile = join(workDir, "state.md"); + mkdirSync(workDir, { recursive: true }); + writeFileSync(stateFile, `---\nloop_id: abc\nstatus: done\niteration: 3\n---\n\n${body}\n`, "utf-8"); + return stateFile; + } + + test("posts the state.md body as a threaded reply on completion", async () => { + const stateFile = writeStateFile("# Progress\n- Tick 1: Hello!\n- Tick 2: Hello!\n- Tick 3: Hello!"); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice(makeLoop({ stateFile, statusMessageTs: "1700000000.100100" }), "done"); + + // The status message edit is one call; the summary is a second + // postToChannel call in the same thread. + expect(slack.postToChannel).toHaveBeenCalledTimes(1); + const [channel, text, threadTs] = slack.postToChannel.mock.calls[0]; + expect(channel).toBe("C100"); + expect(text).toContain("Tick 1: Hello!"); + expect(text).toContain("Tick 3: Hello!"); + expect(text).toContain("final state"); + // Frontmatter must be stripped + expect(text).not.toContain("loop_id: abc"); + expect(text).not.toContain("iteration: 3"); + // Posted in the same thread as the original turn + expect(threadTs).toBe("1700000000.000100"); + }); + + test("falls back to status_message_ts when conversationId is null", async () => { + const stateFile = writeStateFile("# Progress\n- done"); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice( + makeLoop({ + stateFile, + statusMessageTs: "1700000000.100100", + conversationId: null, + }), + "done", + ); + + expect(slack.postToChannel).toHaveBeenCalledTimes(1); + const threadTs = slack.postToChannel.mock.calls[0][2]; + expect(threadTs).toBe("1700000000.100100"); + }); + + test("silently skips summary when state file does not exist", async () => { + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice( + makeLoop({ stateFile: "/nonexistent/path/state.md", statusMessageTs: "1700000000.100100" }), + "done", + ); + // The terminal reaction path still runs, but no summary post. + expect(slack.postToChannel).not.toHaveBeenCalled(); + }); + + test("silently skips summary when body is empty", async () => { + const stateFile = writeStateFile(""); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice(makeLoop({ stateFile, statusMessageTs: "1700000000.100100" }), "done"); + expect(slack.postToChannel).not.toHaveBeenCalled(); + }); + + test("truncates very long summaries", async () => { + // 5000 chars of body, well over the 3500 cap + const body = "x".repeat(5000); + const stateFile = writeStateFile(body); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice(makeLoop({ stateFile, statusMessageTs: "1700000000.100100" }), "done"); + const text = slack.postToChannel.mock.calls[0][1] as string; + expect(text).toContain("…(truncated)"); + // Total posted text must be bounded by 3500 chars of body + small + // amount of surrounding formatting, so under ~3700. + expect(text.length).toBeLessThan(3800); + }); + + test("summary also fires for stopped/failed/budget_exceeded outcomes", async () => { + const stateFile = writeStateFile("# Progress\n- partial work"); + for (const status of ["stopped", "failed", "budget_exceeded"] as const) { + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice(makeLoop({ stateFile, statusMessageTs: "1700000000.100100", status }), status); + expect(slack.postToChannel).toHaveBeenCalledTimes(1); + expect(slack.postToChannel.mock.calls[0][1]).toContain("partial work"); + } + }); + }); }); }); diff --git a/src/loop/notifications.ts b/src/loop/notifications.ts index 8fdbbea..0bbdf13 100644 --- a/src/loop/notifications.ts +++ b/src/loop/notifications.ts @@ -1,5 +1,6 @@ import type { SlackBlock } from "../channels/feedback.ts"; import type { SlackChannel } from "../channels/slack.ts"; +import { readStateFile } from "./state-file.ts"; import type { LoopStore } from "./store.ts"; import type { Loop, LoopStatus } from "./types.ts"; @@ -45,6 +46,55 @@ function truncate(text: string, max: number): string { return text.length <= max ? text : `${text.slice(0, max - 1)}…`; } +/** + * Status message blocks: one section for the current text plus a Stop button. + * These must be re-sent on every updateMessage call, because Slack's chat.update + * replaces the message wholesale and drops any blocks the caller does not + * include. Passing this on tick updates is how the Stop button survives across + * progress edits. The final notice deliberately omits blocks so the button + * disappears on completion. + */ +function buildStatusBlocks(text: string, loopId: string): SlackBlock[] { + return [ + { type: "section", text: { type: "mrkdwn", text } }, + { + type: "actions", + block_id: `phantom_loop_actions_${loopId}`, + elements: [ + { + type: "button", + text: { type: "plain_text", text: "Stop loop", emoji: true }, + action_id: `phantom:loop_stop:${loopId}`, + style: "danger", + value: loopId, + }, + ], + }, + ]; +} + +const FRONTMATTER_RE = /^---\s*\n[\s\S]*?\n---\s*\n?/; +const MAX_SUMMARY_CHARS = 3500; + +/** + * Extract the human-readable body of the state file for the end-of-loop + * summary. Drops the YAML frontmatter (runner plumbing) and truncates at a + * safe limit so a runaway state file does not blow out a Slack message. + * Returns null if the file is unreadable or effectively empty, which signals + * the caller to skip the summary cleanly. + */ +function extractStateSummary(stateFilePath: string): string | null { + try { + const contents = readStateFile(stateFilePath); + const body = contents.replace(FRONTMATTER_RE, "").trim(); + if (!body) return null; + if (body.length <= MAX_SUMMARY_CHARS) return body; + return `${body.slice(0, MAX_SUMMARY_CHARS)}\n\n…(truncated)`; + } catch { + return null; + } +} + /** * Slack feedback for the loop lifecycle: start notice, per-tick progress * edit, final notice, and a reaction ladder on the operator's original @@ -78,25 +128,9 @@ export class LoopNotifier { if (!ts) return; this.store.setStatusMessageTs(loop.id, ts); - // Attach a stop button so the operator can interrupt without using MCP. + // Attach the stop button so the operator can interrupt without using MCP. // Routed via setLoopStopHandler in slack-actions.ts. - const blocks: SlackBlock[] = [ - { type: "section", text: { type: "mrkdwn", text } }, - { - type: "actions", - block_id: `phantom_loop_actions_${loop.id}`, - elements: [ - { - type: "button", - text: { type: "plain_text", text: "Stop loop", emoji: true }, - action_id: `phantom:loop_stop:${loop.id}`, - style: "danger", - value: loop.id, - }, - ], - }, - ]; - await this.slackChannel.updateMessage(loop.channelId, ts, text, blocks); + await this.slackChannel.updateMessage(loop.channelId, ts, text, buildStatusBlocks(text, loop.id)); if (loop.triggerMessageTs) { await this.slackChannel.addReaction(loop.channelId, loop.triggerMessageTs, REACTION_START); @@ -110,7 +144,9 @@ export class LoopNotifier { const bar = buildProgressBar(iteration, loop.maxIterations); const shortId = loop.id.slice(0, 8); const text = `:repeat: Loop \`${shortId}\` · ${bar} ${iteration}/${loop.maxIterations} · $${loop.totalCostUsd.toFixed(2)}/$${loop.maxCostUsd.toFixed(2)} · ${status}`; - await this.slackChannel.updateMessage(loop.channelId, loop.statusMessageTs, text); + // Re-send the blocks on every edit, otherwise Slack strips the Stop + // button (chat.update replaces the entire message, including blocks). + await this.slackChannel.updateMessage(loop.channelId, loop.statusMessageTs, text, buildStatusBlocks(text, loop.id)); // On the first tick, swap hourglass → cycling arrows. Restart-safe by // construction: iteration is sourced from the call site, so on resume @@ -126,12 +162,29 @@ export class LoopNotifier { if (!this.slackChannel || !loop.channelId) return; const emoji = terminalEmoji(status); const text = `${emoji} Loop \`${loop.id.slice(0, 8)}\` finished (${status}) after ${loop.iterationCount} iterations, $${loop.totalCostUsd.toFixed(4)} spent`; + // Intentionally no blocks on the terminal edit: this strips the Stop + // button since the loop is no longer interruptible. if (loop.statusMessageTs) { await this.slackChannel.updateMessage(loop.channelId, loop.statusMessageTs, text); } else { await this.slackChannel.postToChannel(loop.channelId, text); } + // Post the state.md body as a threaded reply so the operator can see + // what the agent actually did across the run. The state file is the + // agent's working memory, curated every tick, so it already contains + // a progress log the operator wants to read. This costs no extra + // agent calls; we simply surface content the agent already wrote. + const summary = extractStateSummary(loop.stateFile); + if (summary) { + const summaryThreadTs = loop.conversationId ?? loop.statusMessageTs ?? undefined; + await this.slackChannel.postToChannel( + loop.channelId, + `:notebook: *Loop \`${loop.id.slice(0, 8)}\` final state:*\n\`\`\`\n${summary}\n\`\`\``, + summaryThreadTs, + ); + } + if (loop.triggerMessageTs) { // Best-effort: clear whichever in-flight reaction is currently on // the message (removeReaction is idempotent on missing), then stamp From a115e8f68507b44da80c8e7f5092d99061310609 Mon Sep 17 00:00:00 2001 From: electronicBlacksmith Date: Sun, 5 Apr 2026 23:14:40 +0000 Subject: [PATCH 3/6] fix(loop): eliminate tick/finalize race and show progress bar in final message 1. Tick update race: postTickUpdate was fire-and-forget, so a stop on tick N+1 could race with tick N's Slack write. If the tick update's HTTP response arrived after postFinalNotice, it overwrote the final message and re-sent the Stop button blocks. Awaiting postTickUpdate serializes Slack writes so finalize always runs after the last tick update completes. 2. Final message now includes the progress bar at its halted position, visually consistent with tick updates. A stopped loop at 3/10 shows the bar frozen at 3/10 with "stopped" instead of a terse one-liner. --- src/loop/notifications.ts | 4 +++- src/loop/runner.ts | 11 +++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/loop/notifications.ts b/src/loop/notifications.ts index 0bbdf13..ced2119 100644 --- a/src/loop/notifications.ts +++ b/src/loop/notifications.ts @@ -161,7 +161,9 @@ export class LoopNotifier { async postFinalNotice(loop: Loop, status: LoopStatus): Promise { if (!this.slackChannel || !loop.channelId) return; const emoji = terminalEmoji(status); - const text = `${emoji} Loop \`${loop.id.slice(0, 8)}\` finished (${status}) after ${loop.iterationCount} iterations, $${loop.totalCostUsd.toFixed(4)} spent`; + const bar = buildProgressBar(loop.iterationCount, loop.maxIterations); + const shortId = loop.id.slice(0, 8); + const text = `${emoji} Loop \`${shortId}\` · ${bar} ${loop.iterationCount}/${loop.maxIterations} · $${loop.totalCostUsd.toFixed(4)} · ${status}`; // Intentionally no blocks on the terminal edit: this strips the Stop // button since the loop is no longer interruptible. if (loop.statusMessageTs) { diff --git a/src/loop/runner.ts b/src/loop/runner.ts index 4c61de6..d2ced99 100644 --- a/src/loop/runner.ts +++ b/src/loop/runner.ts @@ -205,10 +205,17 @@ export class LoopRunner { } } - this.notifier.postTickUpdate(id, nextIteration, frontmatter?.status ?? "in-progress").catch((err: unknown) => { + // Await the tick update so its Slack write finishes before the next + // tick can start (and potentially finalize). Without this, a stop on + // tick N+1 can race: postFinalNotice strips the Stop button, then the + // fire-and-forget postTickUpdate from tick N resolves and re-sends the + // blocks, making the button reappear on a finalized message. + try { + await this.notifier.postTickUpdate(id, nextIteration, frontmatter?.status ?? "in-progress"); + } catch (err: unknown) { const msg = err instanceof Error ? err.message : String(err); console.warn(`[loop] Failed to post tick update for ${id}: ${msg}`); - }); + } this.scheduleTick(id); } finally { From 305cb335b58b22f3dff8e7531bb2386014b6805e Mon Sep 17 00:00:00 2001 From: electronicBlacksmith Date: Mon, 6 Apr 2026 01:05:20 +0000 Subject: [PATCH 4/6] feat(loop): integrate evolution, memory, and mid-loop critique into loop ticks Loop ticks now use Phantom's full intelligence stack instead of running blind: Phase 1 - Memory context injection: cached once at loop start from the goal, injected into every tick prompt via TickPromptOptions. Cleared on finalize, rebuilt on resume. Phase 2 - Post-loop evolution and consolidation: bounded transcript accumulation (first tick + rolling 10 summaries + last tick), SessionData synthesis in finalize(), fire-and-forget evolution pipeline and LLM/heuristic memory consolidation with cost-cap guards matching the interactive path. Phase 3 - Mid-loop critique checkpoints: optional checkpoint_interval param lets the agent request Sonnet 4.6 review every N ticks. Guard requires evolution enabled, LLM judges active, and cost cap not exceeded. Critique is awaited before next tick to avoid race conditions. Closes #8 --- src/db/__tests__/migrate.test.ts | 4 +- src/db/schema.ts | 2 + src/index.ts | 13 +- .../__tests__/evolution-integration.test.ts | 364 ++++++++++++++++++ src/loop/__tests__/notifications.test.ts | 1 + src/loop/__tests__/post-loop.test.ts | 128 ++++++ src/loop/__tests__/prompt.test.ts | 85 ++++ src/loop/critique.ts | 65 ++++ src/loop/post-loop.ts | 112 ++++++ src/loop/prompt.ts | 21 +- src/loop/runner.ts | 152 +++++--- src/loop/store.ts | 6 +- src/loop/tool.ts | 9 + src/loop/types.ts | 5 + src/memory/consolidation.ts | 2 +- 15 files changed, 914 insertions(+), 55 deletions(-) create mode 100644 src/loop/__tests__/evolution-integration.test.ts create mode 100644 src/loop/__tests__/post-loop.test.ts create mode 100644 src/loop/__tests__/prompt.test.ts create mode 100644 src/loop/critique.ts create mode 100644 src/loop/post-loop.ts diff --git a/src/db/__tests__/migrate.test.ts b/src/db/__tests__/migrate.test.ts index cc06d02..4662419 100644 --- a/src/db/__tests__/migrate.test.ts +++ b/src/db/__tests__/migrate.test.ts @@ -36,7 +36,7 @@ describe("runMigrations", () => { runMigrations(db); const migrationCount = db.query("SELECT COUNT(*) as count FROM _migrations").get() as { count: number }; - expect(migrationCount.count).toBe(12); + expect(migrationCount.count).toBe(13); }); test("tracks applied migration indices", () => { @@ -48,6 +48,6 @@ describe("runMigrations", () => { .all() .map((r) => (r as { index_num: number }).index_num); - expect(indices).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + expect(indices).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]); }); }); diff --git a/src/db/schema.ts b/src/db/schema.ts index 4fb3302..060f5c6 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -126,4 +126,6 @@ export const MIGRATIONS: string[] = [ // Appended, never inserted mid-array: existing deployments have already // applied migrations 0–10, so the new column must land at index 11. "ALTER TABLE loops ADD COLUMN trigger_message_ts TEXT", + + "ALTER TABLE loops ADD COLUMN checkpoint_interval INTEGER", ]; diff --git a/src/index.ts b/src/index.ts index bd9986e..7cf4d68 100644 --- a/src/index.ts +++ b/src/index.ts @@ -116,8 +116,9 @@ async function main(): Promise { runtime.setRoleTemplate(activeRole); } + let contextBuilder: MemoryContextBuilder | undefined; if (memory.isReady()) { - const contextBuilder = new MemoryContextBuilder(memory, memoryConfig); + contextBuilder = new MemoryContextBuilder(memory, memoryConfig); runtime.setMemoryContextBuilder(contextBuilder); } @@ -160,7 +161,15 @@ async function main(): Promise { let mcpServer: PhantomMcpServer | null = null; let scheduler: Scheduler | null = null; - const loopRunner = new LoopRunner({ db, runtime }); + const postLoopDeps = + evolution && memory.isReady() + ? { + evolution, + memory, + onEvolvedConfigUpdate: (config: ReturnType) => runtime.setEvolvedConfig(config), + } + : undefined; + const loopRunner = new LoopRunner({ db, runtime, memoryContextBuilder: contextBuilder, postLoopDeps }); try { mcpServer = new PhantomMcpServer({ config, diff --git a/src/loop/__tests__/evolution-integration.test.ts b/src/loop/__tests__/evolution-integration.test.ts new file mode 100644 index 0000000..074b297 --- /dev/null +++ b/src/loop/__tests__/evolution-integration.test.ts @@ -0,0 +1,364 @@ +import { Database } from "bun:sqlite"; +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; +import { mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { runMigrations } from "../../db/migrate.ts"; +import { LoopRunner } from "../runner.ts"; +import { LoopStartInputSchema } from "../types.ts"; + +type HandleMessageImpl = ( + channel: string, + conversationId: string, + text: string, +) => Promise<{ + text: string; + sessionId: string; + cost: { totalUsd: number; inputTokens: number; outputTokens: number; modelUsage: Record }; + durationMs: number; +}>; + +function createMockRuntime(impl?: HandleMessageImpl) { + const defaultImpl: HandleMessageImpl = async () => ({ + text: "ok", + sessionId: "s", + cost: { totalUsd: 0.01, inputTokens: 10, outputTokens: 10, modelUsage: {} }, + durationMs: 10, + }); + return { handleMessage: mock(impl ?? defaultImpl) }; +} + +function agentFinishes(stateFile: string, loopId: string): HandleMessageImpl { + return async () => { + writeFileSync(stateFile, `---\nloop_id: ${loopId}\nstatus: done\niteration: 1\n---\n\nDone.\n`, "utf-8"); + return { + text: "done", + sessionId: "s", + cost: { totalUsd: 0.01, inputTokens: 1, outputTokens: 1, modelUsage: {} }, + durationMs: 1, + }; + }; +} + +describe("LoopRunner evolution integration", () => { + let db: Database; + let dataDir: string; + + beforeEach(() => { + db = new Database(":memory:"); + db.run("PRAGMA journal_mode = WAL"); + db.run("PRAGMA foreign_keys = ON"); + runMigrations(db); + dataDir = mkdtempSync(join(tmpdir(), "phantom-loop-evo-")); + }); + + afterEach(() => { + db.close(); + rmSync(dataDir, { recursive: true, force: true }); + }); + + test("memory context is cached once at start and reused across ticks", async () => { + const buildMock = mock(async () => "## Known Facts\n- User prefers TS"); + const mockContextBuilder = { build: buildMock } as never; + + const runtime = createMockRuntime(); + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + memoryContextBuilder: mockContextBuilder, + }); + const loop = runner.start({ goal: "Test memory caching", maxIterations: 5 }); + + // Allow the async cache to resolve + await Bun.sleep(10); + + // buildMock called once at start + expect(buildMock).toHaveBeenCalledTimes(1); + expect(buildMock).toHaveBeenCalledWith("Test memory caching"); + + // Tick multiple times - build should still only be called once + await runner.tick(loop.id); + await runner.tick(loop.id); + expect(buildMock).toHaveBeenCalledTimes(1); + + // Verify the prompt contains memory context + const promptArg = runtime.handleMessage.mock.calls[0][2] as string; + expect(promptArg).toContain("RECALLED MEMORIES"); + expect(promptArg).toContain("User prefers TS"); + }); + + test("memory context is cleared on finalize", async () => { + const buildMock = mock(async () => "some context"); + const mockContextBuilder = { build: buildMock } as never; + + const runtime = createMockRuntime(); + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + memoryContextBuilder: mockContextBuilder, + }); + const loop = runner.start({ goal: "clean up", maxIterations: 1 }); + await Bun.sleep(10); + + runtime.handleMessage.mockImplementation(agentFinishes(loop.stateFile, loop.id)); + await runner.tick(loop.id); + + expect(runner.getLoop(loop.id)?.status).toBe("done"); + + // Start another loop - build should be called again (cache was cleared) + runner.start({ goal: "another" }); + await Bun.sleep(10); + expect(buildMock).toHaveBeenCalledTimes(2); + }); + + test("post-loop evolution is called on finalize with correct session data", async () => { + const afterSessionMock = mock(async () => ({ version: 1, changes_applied: [], changes_rejected: [] })); + const mockEvolution = { + afterSession: afterSessionMock, + getConfig: () => ({ userProfile: "", domainKnowledge: "" }), + }; + const mockMemory = { isReady: () => false }; + const onUpdate = mock(() => {}); + + const runtime = createMockRuntime(); + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + postLoopDeps: { + evolution: mockEvolution as never, + memory: mockMemory as never, + onEvolvedConfigUpdate: onUpdate, + }, + }); + const loop = runner.start({ goal: "evolve this" }); + runtime.handleMessage.mockImplementation(agentFinishes(loop.stateFile, loop.id)); + await runner.tick(loop.id); + + // Allow fire-and-forget to complete + await Bun.sleep(50); + + expect(afterSessionMock).toHaveBeenCalledTimes(1); + const summary = (afterSessionMock.mock.calls as unknown[][])[0][0] as Record; + expect(summary.session_id).toBe(loop.id); + expect(summary.outcome).toBe("success"); + }); + + test("post-loop evolution failure does not affect loop status", async () => { + const mockEvolution = { + afterSession: mock(async () => { + throw new Error("evolution broke"); + }), + getConfig: () => ({ userProfile: "", domainKnowledge: "" }), + }; + const mockMemory = { isReady: () => false }; + + const runtime = createMockRuntime(); + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + postLoopDeps: { + evolution: mockEvolution as never, + memory: mockMemory as never, + onEvolvedConfigUpdate: () => {}, + }, + }); + const loop = runner.start({ goal: "survive evolution failure" }); + runtime.handleMessage.mockImplementation(agentFinishes(loop.stateFile, loop.id)); + await runner.tick(loop.id); + + await Bun.sleep(50); + + // Loop should still be done, not failed + expect(runner.getLoop(loop.id)?.status).toBe("done"); + }); + + test("critique does NOT fire when postLoopDeps is absent", async () => { + const runtime = createMockRuntime(); + // No postLoopDeps = no evolution engine = critique should never fire + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + }); + const loop = runner.start({ goal: "no evolution", checkpointInterval: 1 }); + + await runner.tick(loop.id); + + // The prompt should NOT contain reviewer feedback (critique skipped) + const promptArg = runtime.handleMessage.mock.calls[0][2] as string; + expect(promptArg).not.toContain("REVIEWER FEEDBACK"); + }); + + test("critique does NOT fire when usesLLMJudges returns false", async () => { + const mockEvolution = { + afterSession: mock(async () => ({ version: 1, changes_applied: [], changes_rejected: [] })), + getConfig: () => ({ userProfile: "", domainKnowledge: "" }), + usesLLMJudges: () => false, + isWithinCostCap: () => true, + trackExternalJudgeCost: mock(() => {}), + }; + + const runtime = createMockRuntime(); + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + postLoopDeps: { + evolution: mockEvolution as never, + memory: { isReady: () => false } as never, + onEvolvedConfigUpdate: () => {}, + }, + }); + const loop = runner.start({ goal: "judges disabled", checkpointInterval: 1 }); + + await runner.tick(loop.id); + await runner.tick(loop.id); + + // Second tick prompt should NOT have critique (judges disabled) + const secondPrompt = runtime.handleMessage.mock.calls[1][2] as string; + expect(secondPrompt).not.toContain("REVIEWER FEEDBACK"); + expect(mockEvolution.trackExternalJudgeCost).not.toHaveBeenCalled(); + }); + + test("critique does NOT fire when cost cap is exceeded", async () => { + const mockEvolution = { + afterSession: mock(async () => ({ version: 1, changes_applied: [], changes_rejected: [] })), + getConfig: () => ({ userProfile: "", domainKnowledge: "" }), + usesLLMJudges: () => true, + isWithinCostCap: () => false, + trackExternalJudgeCost: mock(() => {}), + }; + + const runtime = createMockRuntime(); + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + postLoopDeps: { + evolution: mockEvolution as never, + memory: { isReady: () => false } as never, + onEvolvedConfigUpdate: () => {}, + }, + }); + const loop = runner.start({ goal: "over budget", checkpointInterval: 1 }); + + await runner.tick(loop.id); + await runner.tick(loop.id); + + const secondPrompt = runtime.handleMessage.mock.calls[1][2] as string; + expect(secondPrompt).not.toContain("REVIEWER FEEDBACK"); + expect(mockEvolution.trackExternalJudgeCost).not.toHaveBeenCalled(); + }); + + test("tick 1 summary is recorded in transcript", async () => { + const runtime = createMockRuntime(); + const afterSessionMock = mock(async () => ({ version: 1, changes_applied: [], changes_rejected: [] })); + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + postLoopDeps: { + evolution: { + afterSession: afterSessionMock, + getConfig: () => ({ userProfile: "", domainKnowledge: "" }), + usesLLMJudges: () => false, + isWithinCostCap: () => true, + trackExternalJudgeCost: mock(() => {}), + } as never, + memory: { isReady: () => false } as never, + onEvolvedConfigUpdate: () => {}, + }, + }); + const loop = runner.start({ goal: "check tick 1 summary" }); + + runtime.handleMessage.mockImplementation(agentFinishes(loop.stateFile, loop.id)); + await runner.tick(loop.id); + + // Allow fire-and-forget to complete + await Bun.sleep(50); + + // The session data should include a Tick 1 summary + const summary = (afterSessionMock.mock.calls as unknown[][])[0][0] as Record; + const userMsgs = summary.user_messages as string[]; + const hasTick1Summary = userMsgs.some((m) => m.includes("Tick 1:")); + expect(hasTick1Summary).toBe(true); + }); + + test("checkpoint_interval round-trips through start/store", () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime, dataDir, autoSchedule: false }); + + const loop = runner.start({ goal: "with checkpoint", checkpointInterval: 5 }); + expect(loop.checkpointInterval).toBe(5); + + const reloaded = runner.getLoop(loop.id); + expect(reloaded?.checkpointInterval).toBe(5); + }); + + test("checkpoint_interval defaults to null when omitted", () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime, dataDir, autoSchedule: false }); + + const loop = runner.start({ goal: "no checkpoint" }); + expect(loop.checkpointInterval).toBeNull(); + }); +}); + +describe("LoopStartInputSchema checkpoint_interval validation", () => { + test("accepts valid checkpoint_interval", () => { + const result = LoopStartInputSchema.safeParse({ + goal: "test", + checkpoint_interval: 5, + }); + expect(result.success).toBe(true); + }); + + test("accepts 0 (disabled)", () => { + const result = LoopStartInputSchema.safeParse({ + goal: "test", + checkpoint_interval: 0, + }); + expect(result.success).toBe(true); + }); + + test("rejects negative values", () => { + const result = LoopStartInputSchema.safeParse({ + goal: "test", + checkpoint_interval: -1, + }); + expect(result.success).toBe(false); + }); + + test("rejects values above ceiling", () => { + const result = LoopStartInputSchema.safeParse({ + goal: "test", + checkpoint_interval: 201, + }); + expect(result.success).toBe(false); + }); + + test("rejects non-integer values", () => { + const result = LoopStartInputSchema.safeParse({ + goal: "test", + checkpoint_interval: 5.5, + }); + expect(result.success).toBe(false); + }); + + test("accepts omitted (optional)", () => { + const result = LoopStartInputSchema.safeParse({ goal: "test" }); + expect(result.success).toBe(true); + }); +}); diff --git a/src/loop/__tests__/notifications.test.ts b/src/loop/__tests__/notifications.test.ts index 1443125..39840d4 100644 --- a/src/loop/__tests__/notifications.test.ts +++ b/src/loop/__tests__/notifications.test.ts @@ -41,6 +41,7 @@ function makeLoop(overrides: Partial = {}): Loop { successCommand: null, maxIterations: 10, maxCostUsd: 5, + checkpointInterval: null, status: "running", iterationCount: 0, totalCostUsd: 0, diff --git a/src/loop/__tests__/post-loop.test.ts b/src/loop/__tests__/post-loop.test.ts new file mode 100644 index 0000000..ae12196 --- /dev/null +++ b/src/loop/__tests__/post-loop.test.ts @@ -0,0 +1,128 @@ +import { describe, expect, test } from "bun:test"; +import { type LoopTranscript, synthesizeSessionData } from "../post-loop.ts"; +import type { Loop } from "../types.ts"; + +function makeLoop(overrides: Partial = {}): Loop { + return { + id: "loop-123", + goal: "Refactor the auth module", + workspaceDir: "/tmp/ws", + stateFile: "/tmp/ws/state.md", + successCommand: null, + maxIterations: 20, + maxCostUsd: 5, + checkpointInterval: null, + status: "running", + iterationCount: 5, + totalCostUsd: 1.23, + channelId: null, + conversationId: null, + statusMessageTs: null, + triggerMessageTs: null, + interruptRequested: false, + lastError: null, + startedAt: "2024-01-01T00:00:00Z", + lastTickAt: "2024-01-01T00:05:00Z", + finishedAt: "2024-01-01T00:06:00Z", + ...overrides, + }; +} + +function makeTranscript(overrides: Partial = {}): LoopTranscript { + return { + firstPrompt: "First tick prompt", + firstResponse: "First tick response", + summaries: ["Tick 2: in-progress", "Tick 3: in-progress"], + lastPrompt: "Last tick prompt", + lastResponse: "Last tick response", + ...overrides, + }; +} + +describe("synthesizeSessionData", () => { + test("maps done status to success outcome", () => { + const data = synthesizeSessionData(makeLoop(), "done", makeTranscript()); + expect(data.outcome).toBe("success"); + }); + + test("maps stopped status to abandoned outcome", () => { + const data = synthesizeSessionData(makeLoop(), "stopped", makeTranscript()); + expect(data.outcome).toBe("abandoned"); + }); + + test("maps budget_exceeded status to failure outcome", () => { + const data = synthesizeSessionData(makeLoop(), "budget_exceeded", makeTranscript()); + expect(data.outcome).toBe("failure"); + }); + + test("maps failed status to failure outcome", () => { + const data = synthesizeSessionData(makeLoop(), "failed", makeTranscript()); + expect(data.outcome).toBe("failure"); + }); + + test("includes context header with tick count and goal", () => { + const data = synthesizeSessionData(makeLoop(), "done", makeTranscript()); + expect(data.userMessages[0]).toContain("[Loop: 5 ticks"); + expect(data.userMessages[0]).toContain("Refactor the auth module"); + expect(data.userMessages[0]).toContain("outcome: success"); + }); + + test("includes first tick prompt, rolling summaries, and last tick prompt", () => { + const data = synthesizeSessionData(makeLoop(), "done", makeTranscript()); + expect(data.userMessages[0]).toContain("First tick prompt"); + expect(data.userMessages).toContain("Tick 2: in-progress"); + expect(data.userMessages).toContain("Tick 3: in-progress"); + expect(data.userMessages[data.userMessages.length - 1]).toContain("Last tick prompt"); + }); + + test("includes first and last assistant responses", () => { + const data = synthesizeSessionData(makeLoop(), "done", makeTranscript()); + expect(data.assistantMessages).toHaveLength(2); + expect(data.assistantMessages[0]).toContain("First tick response"); + expect(data.assistantMessages[1]).toContain("Last tick response"); + }); + + test("uses channel:channelId for Slack-originated loops", () => { + const loop = makeLoop({ channelId: "C100" }); + const data = synthesizeSessionData(loop, "done", makeTranscript()); + expect(data.userId).toBe("channel:C100"); + }); + + test("uses 'autonomous' for headless loops", () => { + const loop = makeLoop({ channelId: null }); + const data = synthesizeSessionData(loop, "done", makeTranscript()); + expect(data.userId).toBe("autonomous"); + }); + + test("session key uses channel:conversation for Slack loops", () => { + const loop = makeLoop({ channelId: "C100", conversationId: "1700000.000" }); + const data = synthesizeSessionData(loop, "done", makeTranscript()); + expect(data.sessionKey).toBe("C100:1700000.000"); + }); + + test("session key uses loop:id for headless loops", () => { + const loop = makeLoop({ channelId: null }); + const data = synthesizeSessionData(loop, "done", makeTranscript()); + expect(data.sessionKey).toBe("loop:loop-123"); + }); + + test("passes through cost and timestamps", () => { + const data = synthesizeSessionData(makeLoop(), "done", makeTranscript()); + expect(data.costUsd).toBe(1.23); + expect(data.startedAt).toBe("2024-01-01T00:00:00Z"); + expect(data.endedAt).toBe("2024-01-01T00:06:00Z"); + }); + + test("uses empty arrays for toolsUsed and filesTracked", () => { + const data = synthesizeSessionData(makeLoop(), "done", makeTranscript()); + expect(data.toolsUsed).toEqual([]); + expect(data.filesTracked).toEqual([]); + }); + + test("handles empty transcript (no-tick loop)", () => { + const transcript = makeTranscript({ summaries: [] }); + const data = synthesizeSessionData(makeLoop({ iterationCount: 0 }), "stopped", transcript); + expect(data.userMessages.length).toBeGreaterThan(0); + expect(data.outcome).toBe("abandoned"); + }); +}); diff --git a/src/loop/__tests__/prompt.test.ts b/src/loop/__tests__/prompt.test.ts new file mode 100644 index 0000000..eae3dc1 --- /dev/null +++ b/src/loop/__tests__/prompt.test.ts @@ -0,0 +1,85 @@ +import { describe, expect, test } from "bun:test"; +import { buildTickPrompt } from "../prompt.ts"; +import type { Loop } from "../types.ts"; + +function makeLoop(overrides: Partial = {}): Loop { + return { + id: "loop-1", + goal: "Write a haiku", + workspaceDir: "/tmp/ws", + stateFile: "/tmp/ws/state.md", + successCommand: null, + maxIterations: 20, + maxCostUsd: 5, + checkpointInterval: null, + status: "running", + iterationCount: 3, + totalCostUsd: 0.5, + channelId: null, + conversationId: null, + statusMessageTs: null, + triggerMessageTs: null, + interruptRequested: false, + lastError: null, + startedAt: "2024-01-01T00:00:00Z", + lastTickAt: null, + finishedAt: null, + ...overrides, + }; +} + +describe("buildTickPrompt", () => { + test("returns base prompt without optional sections when no options provided", () => { + const prompt = buildTickPrompt(makeLoop(), "state contents"); + expect(prompt).toContain("Write a haiku"); + expect(prompt).toContain("state contents"); + expect(prompt).not.toContain("RECALLED MEMORIES"); + expect(prompt).not.toContain("REVIEWER FEEDBACK"); + }); + + test("injects memory context before state file section", () => { + const memoryContext = "## Known Facts\n- User prefers TypeScript"; + const prompt = buildTickPrompt(makeLoop(), "state contents", { memoryContext }); + + expect(prompt).toContain("RECALLED MEMORIES (from previous sessions)"); + expect(prompt).toContain("User prefers TypeScript"); + + // Memory should appear before state file contents + const memoryIdx = prompt.indexOf("RECALLED MEMORIES"); + const stateIdx = prompt.indexOf("CURRENT STATE FILE CONTENTS"); + expect(memoryIdx).toBeLessThan(stateIdx); + }); + + test("injects critique section", () => { + const critique = "The loop appears stuck in a pattern."; + const prompt = buildTickPrompt(makeLoop(), "state contents", { critique }); + + expect(prompt).toContain("REVIEWER FEEDBACK (from your last checkpoint)"); + expect(prompt).toContain("stuck in a pattern"); + }); + + test("injects both memory and critique when both provided", () => { + const prompt = buildTickPrompt(makeLoop(), "state contents", { + memoryContext: "Some facts", + critique: "Some feedback", + }); + + expect(prompt).toContain("RECALLED MEMORIES"); + expect(prompt).toContain("REVIEWER FEEDBACK"); + + // Memory should come before critique + const memoryIdx = prompt.indexOf("RECALLED MEMORIES"); + const critiqueIdx = prompt.indexOf("REVIEWER FEEDBACK"); + expect(memoryIdx).toBeLessThan(critiqueIdx); + }); + + test("skips empty memory context", () => { + const prompt = buildTickPrompt(makeLoop(), "state contents", { memoryContext: "" }); + expect(prompt).not.toContain("RECALLED MEMORIES"); + }); + + test("skips empty critique", () => { + const prompt = buildTickPrompt(makeLoop(), "state contents", { critique: "" }); + expect(prompt).not.toContain("REVIEWER FEEDBACK"); + }); +}); diff --git a/src/loop/critique.ts b/src/loop/critique.ts new file mode 100644 index 0000000..a777010 --- /dev/null +++ b/src/loop/critique.ts @@ -0,0 +1,65 @@ +import { z } from "zod/v4"; +import { callJudge } from "../evolution/judges/client.ts"; +import { JUDGE_MODEL_SONNET, type JudgeCostEntry } from "../evolution/judges/types.ts"; +import type { LoopTranscript } from "./post-loop.ts"; + +export type CritiqueResult = { + assessment: string; + cost: JudgeCostEntry; +}; + +const CritiqueSchema = z.object({ assessment: z.string() }); + +/** + * Mid-loop critique: Sonnet 4.6 reviews the loop's progress every N ticks + * to detect drift, stuck patterns, or wasted budget before the loop runs out. + * Same cross-model pattern as interactive sessions (Sonnet judging Opus). + */ +export async function runCritiqueJudge( + goal: string, + stateFileContents: string, + transcript: LoopTranscript, + iteration: number, + maxIterations: number, +): Promise { + const system = `You are a reviewer assessing an autonomous agent loop mid-flight. +The agent (Opus 4.6) is running inside a tight iteration loop toward a goal. +Your job is to assess whether the loop is making meaningful progress or if +it is stuck, drifting, or wasting budget. Be direct and actionable.`; + + const summaryBlock = transcript.summaries.length > 0 ? `\nTick summaries:\n${transcript.summaries.join("\n")}` : ""; + + const user = `Goal: ${goal} + +Progress (${iteration}/${maxIterations} ticks used): +${summaryBlock} + +Current state file: +${stateFileContents.slice(0, 3000)} + +Last response (truncated): +${transcript.lastResponse.slice(0, 1000)} + +Is this loop making meaningful progress toward the goal? Is the agent stuck +in a pattern? Should it change approach? Give a brief (2-3 sentence) assessment +and one concrete suggestion if applicable.`; + + const result = await callJudge({ + model: JUDGE_MODEL_SONNET, + systemPrompt: system, + userMessage: user, + schema: CritiqueSchema, + schemaName: "LoopCritique", + maxTokens: 500, + }); + + return { + assessment: result.data.assessment, + cost: { + calls: 1, + totalUsd: result.costUsd, + totalInputTokens: result.inputTokens, + totalOutputTokens: result.outputTokens, + }, + }; +} diff --git a/src/loop/post-loop.ts b/src/loop/post-loop.ts new file mode 100644 index 0000000..e9f9bc6 --- /dev/null +++ b/src/loop/post-loop.ts @@ -0,0 +1,112 @@ +import type { EvolutionEngine } from "../evolution/engine.ts"; +import type { SessionData } from "../memory/consolidation.ts"; +import type { MemorySystem } from "../memory/system.ts"; +import type { Loop, LoopStatus } from "./types.ts"; + +export type LoopTranscript = { + firstPrompt: string; + firstResponse: string; + summaries: string[]; + lastPrompt: string; + lastResponse: string; +}; + +export type PostLoopDeps = { + evolution: EvolutionEngine; + memory: MemorySystem; + /** Callback to update runtime's evolved config after evolution applies changes. */ + onEvolvedConfigUpdate: (config: ReturnType) => void; +}; + +function loopStatusToOutcome(status: LoopStatus): SessionData["outcome"] { + switch (status) { + case "done": + return "success"; + case "stopped": + return "abandoned"; + default: + return "failure"; + } +} + +export function synthesizeSessionData(loop: Loop, status: LoopStatus, transcript: LoopTranscript): SessionData { + const outcome = loopStatusToOutcome(status); + const header = `[Loop: ${loop.iterationCount} ticks, goal: ${loop.goal.slice(0, 200)}, outcome: ${outcome}]`; + + const userMessages = [ + `${header} Tick 1: ${transcript.firstPrompt.slice(0, 500)}`, + ...transcript.summaries, + `Final tick: ${transcript.lastPrompt.slice(0, 500)}`, + ]; + + const assistantMessages = [transcript.firstResponse.slice(0, 1000), transcript.lastResponse.slice(0, 1000)]; + + // userId sentinel: channel-originated loops use channel ID, headless use "autonomous" + const userId = loop.channelId ? `channel:${loop.channelId}` : "autonomous"; + + return { + sessionId: loop.id, + sessionKey: loop.channelId && loop.conversationId ? `${loop.channelId}:${loop.conversationId}` : `loop:${loop.id}`, + userId, + userMessages, + assistantMessages, + toolsUsed: [], + filesTracked: [], + startedAt: loop.startedAt, + endedAt: loop.finishedAt ?? new Date().toISOString(), + costUsd: loop.totalCostUsd, + outcome, + }; +} + +/** + * Run evolution and memory consolidation after a loop finishes. + * Fire-and-forget from the runner's perspective - errors are logged, + * never propagated to affect loop status. + */ +export async function runPostLoopPipeline(deps: PostLoopDeps, sessionData: SessionData): Promise { + const { evolution, memory, onEvolvedConfigUpdate } = deps; + const { consolidateSessionWithLLM, consolidateSession, sessionDataToSummary } = await import( + "../memory/consolidation.ts" + ); + + const summary = sessionDataToSummary(sessionData); + + // Evolution pipeline (same pattern as src/index.ts:560-589) + try { + const result = await evolution.afterSession(summary); + if (result.changes_applied.length > 0) { + onEvolvedConfigUpdate(evolution.getConfig()); + } + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + console.warn(`[loop] Post-loop evolution failed: ${msg}`); + } + + // Memory consolidation - respects cost cap (matches interactive session pattern) + if (!memory.isReady()) return; + try { + const useLLM = evolution.usesLLMJudges() && evolution.isWithinCostCap(); + if (useLLM) { + const evolvedConfig = evolution.getConfig(); + const existingFacts = `${evolvedConfig.userProfile}\n${evolvedConfig.domainKnowledge}`; + const { result, judgeCost } = await consolidateSessionWithLLM(memory, sessionData, existingFacts); + if (judgeCost) evolution.trackExternalJudgeCost(judgeCost); + if (result.episodesCreated > 0 || result.factsExtracted > 0) { + console.log( + `[loop] Consolidated (LLM): ${result.episodesCreated} episodes, ${result.factsExtracted} facts (${result.durationMs}ms)`, + ); + } + } else { + const result = await consolidateSession(memory, sessionData); + if (result.episodesCreated > 0 || result.factsExtracted > 0) { + console.log( + `[loop] Consolidated: ${result.episodesCreated} episodes, ${result.factsExtracted} facts (${result.durationMs}ms)`, + ); + } + } + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + console.warn(`[loop] Post-loop memory consolidation failed: ${msg}`); + } +} diff --git a/src/loop/prompt.ts b/src/loop/prompt.ts index 259ee5b..7862af6 100644 --- a/src/loop/prompt.ts +++ b/src/loop/prompt.ts @@ -1,12 +1,29 @@ import type { Loop } from "./types.ts"; +export type TickPromptOptions = { + memoryContext?: string; + critique?: string; +}; + /** * Per-tick prompt. Each tick is a fresh SDK session with no prior context * (rotating conversation ids in the runner guarantee this), so the prompt * must carry everything the agent needs: the goal, the state file contract, * the current state file contents, and the workspace path. */ -export function buildTickPrompt(loop: Loop, stateFileContents: string): string { +export function buildTickPrompt(loop: Loop, stateFileContents: string, options?: TickPromptOptions): string { + const memorySections: string[] = []; + + if (options?.memoryContext) { + memorySections.push(`RECALLED MEMORIES (from previous sessions)\n\n${options.memoryContext}`); + } + + if (options?.critique) { + memorySections.push(`REVIEWER FEEDBACK (from your last checkpoint)\n\n${options.critique}`); + } + + const injected = memorySections.length > 0 ? `\n\n${memorySections.join("\n\n")}\n` : ""; + return `You are running inside a "ralph loop" - a tight iteration primitive where a fresh agent session is invoked once per tick. You have no memory from previous ticks. All shared memory lives in the state file at: @@ -45,7 +62,7 @@ is one short paragraph telling the next tick exactly what to do first. THE GOAL -${loop.goal} +${loop.goal}${injected} CURRENT STATE FILE CONTENTS diff --git a/src/loop/runner.ts b/src/loop/runner.ts index d2ced99..2540d27 100644 --- a/src/loop/runner.ts +++ b/src/loop/runner.ts @@ -4,7 +4,10 @@ import { join, relative, resolve } from "node:path"; import type { AgentRuntime } from "../agent/runtime.ts"; import type { SlackChannel } from "../channels/slack.ts"; import { buildSafeEnv } from "../mcp/dynamic-handlers.ts"; +import type { MemoryContextBuilder } from "../memory/context-builder.ts"; +import { runCritiqueJudge } from "./critique.ts"; import { LoopNotifier } from "./notifications.ts"; +import { type LoopTranscript, type PostLoopDeps, runPostLoopPipeline, synthesizeSessionData } from "./post-loop.ts"; import { buildTickPrompt } from "./prompt.ts"; import { initStateFile, parseFrontmatter, readStateFile } from "./state-file.ts"; import { LoopStore } from "./store.ts"; @@ -18,11 +21,7 @@ import { type LoopStatus, } from "./types.ts"; -/** - * The runner only needs handleMessage from the AgentRuntime - narrowing the - * dependency keeps the runner honest (SRP) and lets tests pass a minimal mock - * without `as never` casts. A real AgentRuntime is assignable to this. - */ +/** Narrowed runtime interface for testability. */ type LoopRuntime = Pick; type RunnerDeps = { @@ -30,31 +29,15 @@ type RunnerDeps = { runtime: LoopRuntime; slackChannel?: SlackChannel; dataDir?: string; - /** - * When true (default), start() and resumeRunning() schedule ticks on the - * event loop automatically. Tests set this to false so they can drive - * ticks deterministically with explicit `await runner.tick(id)` calls. - */ + memoryContextBuilder?: MemoryContextBuilder; + postLoopDeps?: PostLoopDeps; + /** Tests set to false to drive ticks deterministically. */ autoSchedule?: boolean; }; const SUCCESS_COMMAND_TIMEOUT_MS = 5 * 60 * 1000; -/** - * LoopRunner owns the lifecycle of ralph loops: - * start -> tick (N times) -> finalize - * - * Each tick is a fresh SDK session. We achieve that by passing a unique - * conversation id per iteration to runtime.handleMessage, which keys - * sessions on `${channelId}:${conversationId}`. No runtime changes needed. - * - * State lives in a markdown file. The runner only reads the YAML frontmatter - * to decide termination - the body is the agent's working memory. - * - * Budgets (max_iterations, max_cost_usd) are enforced here, never trusted to - * the agent. The agent can self-declare done (status: done) to stop early, - * but cannot extend the loop past its budget. - */ +/** start -> tick (N times) -> finalize. State file is the agent's memory across ticks. */ export class LoopRunner { private store: LoopStore; private runtime: LoopRuntime; @@ -63,6 +46,11 @@ export class LoopRunner { private autoSchedule: boolean; private inFlight = new Set(); private notifier: LoopNotifier; + private memoryContextBuilder: MemoryContextBuilder | undefined; + private postLoopDeps: PostLoopDeps | undefined; + private memoryCache = new Map(); + private transcripts = new Map(); + private pendingCritique = new Map(); constructor(deps: RunnerDeps) { this.store = new LoopStore(deps.db); @@ -71,6 +59,8 @@ export class LoopRunner { this.dataDir = deps.dataDir ?? resolve(process.cwd(), "data"); this.autoSchedule = deps.autoSchedule ?? true; this.notifier = new LoopNotifier(this.slackChannel ?? null, this.store); + this.memoryContextBuilder = deps.memoryContextBuilder; + this.postLoopDeps = deps.postLoopDeps; } setSlackChannel(channel: SlackChannel): void { @@ -78,11 +68,6 @@ export class LoopRunner { this.notifier = new LoopNotifier(channel, this.store); } - /** - * Reject operator-supplied workspace paths that escape the data dir. The - * agent invokes start() via MCP, and a stray `..` could point the state - * file outside the sandbox. Mirrors the isPathSafe idiom in src/ui/serve.ts. - */ private assertWorkspaceInsideDataDir(workspace: string): string { const base = resolve(this.dataDir); const target = resolve(base, workspace); @@ -113,16 +98,19 @@ export class LoopRunner { successCommand: input.successCommand ?? null, maxIterations, maxCostUsd, + checkpointInterval: input.checkpointInterval ?? null, channelId: input.channelId ?? null, conversationId: input.conversationId ?? null, triggerMessageTs: input.triggerMessageTs ?? null, }); - this.notifier.postStartNotice(loop).catch((err: unknown) => { - const msg = err instanceof Error ? err.message : String(err); - console.warn(`[loop] Failed to post start notice for ${id}: ${msg}`); + this.notifier.postStartNotice(loop).catch((e: unknown) => { + console.warn(`[loop] Failed to post start notice for ${id}: ${e instanceof Error ? e.message : e}`); }); + // Cache memory context once for the entire loop (goal is constant) + this.cacheMemoryContext(id, input.goal); + this.scheduleTick(id); return loop; } @@ -139,11 +127,11 @@ export class LoopRunner { return this.store.requestStop(id); } - /** On startup, re-queue any loops still marked running. State file is the source of truth. */ resumeRunning(): number { const running = this.store.listByStatus("running"); for (const loop of running) { console.log(`[loop] Resuming ${loop.id} (iteration ${loop.iterationCount})`); + this.cacheMemoryContext(loop.id, loop.goal); this.scheduleTick(loop.id); } return running.length; @@ -151,10 +139,9 @@ export class LoopRunner { private scheduleTick(id: string): void { if (!this.autoSchedule) return; - // setImmediate yields to the event loop so we never recurse on the same stack. setImmediate(() => { - this.tick(id).catch((err: unknown) => { - const msg = err instanceof Error ? err.message : String(err); + this.tick(id).catch((e: unknown) => { + const msg = e instanceof Error ? e.message : String(e); console.error(`[loop] Tick ${id} threw: ${msg}`); this.finalize(id, "failed", msg); }); @@ -179,7 +166,13 @@ export class LoopRunner { } const stateFileContents = readStateFile(loop.stateFile); - const prompt = buildTickPrompt(loop, stateFileContents); + const memoryContext = this.memoryCache.get(id); + const critique = this.pendingCritique.get(id); + if (critique) this.pendingCritique.delete(id); + const prompt = buildTickPrompt(loop, stateFileContents, { + memoryContext, + critique, + }); const conversationId = `${loop.id}:${loop.iterationCount}`; const response = await this.runtime.handleMessage("loop", conversationId, prompt); @@ -192,6 +185,15 @@ export class LoopRunner { const updatedContents = readStateFile(loop.stateFile); const frontmatter = parseFrontmatter(updatedContents); + // Track bounded transcript for post-loop evolution + this.recordTranscript(id, nextIteration, prompt, response.text, frontmatter?.status); + + // Mid-loop critique checkpoint (Sonnet reviewing Opus mid-flight). + // Awaited so the critique is available before the next tick runs. + if (loop.checkpointInterval && loop.checkpointInterval > 0 && nextIteration % loop.checkpointInterval === 0) { + await this.runCritique(id, loop, updatedContents, nextIteration); + } + if (frontmatter?.status === "done") { this.finalize(id, "done", null); return; @@ -205,11 +207,7 @@ export class LoopRunner { } } - // Await the tick update so its Slack write finishes before the next - // tick can start (and potentially finalize). Without this, a stop on - // tick N+1 can race: postFinalNotice strips the Stop button, then the - // fire-and-forget postTickUpdate from tick N resolves and re-sends the - // blocks, making the button reappear on a finalized message. + // Await tick update so its Slack write finishes before the next tick try { await this.notifier.postTickUpdate(id, nextIteration, frontmatter?.status ?? "in-progress"); } catch (err: unknown) { @@ -241,12 +239,74 @@ export class LoopRunner { } private finalize(id: string, status: LoopStatus, error: string | null): void { + const transcript = this.transcripts.get(id); + this.memoryCache.delete(id); + this.transcripts.delete(id); + this.pendingCritique.delete(id); const loop = this.store.finalize(id, status, error); if (!loop) return; - this.notifier.postFinalNotice(loop, status).catch((err: unknown) => { - const msg = err instanceof Error ? err.message : String(err); - console.warn(`[loop] Failed to post final notice for ${id}: ${msg}`); + this.notifier.postFinalNotice(loop, status).catch((e: unknown) => { + console.warn(`[loop] Failed to post final notice for ${id}: ${e instanceof Error ? e.message : e}`); }); + + // Post-loop evolution and consolidation (fire-and-forget, never affects loop status) + if (this.postLoopDeps && transcript) { + runPostLoopPipeline(this.postLoopDeps, synthesizeSessionData(loop, status, transcript)).catch((e: unknown) => { + console.warn(`[loop] Post-loop evolution failed for ${id}: ${e instanceof Error ? e.message : e}`); + }); + } + } + + private async runCritique(loopId: string, loop: Loop, stateContents: string, iteration: number): Promise { + const transcript = this.transcripts.get(loopId); + if (!transcript) return; + const evo = this.postLoopDeps?.evolution; + if (!evo || !evo.usesLLMJudges() || !evo.isWithinCostCap()) return; + try { + const r = await runCritiqueJudge(loop.goal, stateContents, transcript, iteration, loop.maxIterations); + this.pendingCritique.set(loopId, r.assessment); + evo.trackExternalJudgeCost(r.cost); + } catch (e: unknown) { + console.warn(`[loop] Critique failed for ${loopId}: ${e instanceof Error ? e.message : e}`); + } + } + + private recordTranscript( + loopId: string, + iteration: number, + prompt: string, + response: string, + stateStatus: string | undefined, + ): void { + let transcript = this.transcripts.get(loopId); + if (!transcript) { + transcript = { + firstPrompt: prompt, + firstResponse: response, + summaries: [], + lastPrompt: prompt, + lastResponse: response, + }; + this.transcripts.set(loopId, transcript); + } else { + transcript.lastPrompt = prompt; + transcript.lastResponse = response; + } + const summary = `Tick ${iteration}: ${stateStatus ?? "in-progress"}`; + transcript.summaries.push(summary); + if (transcript.summaries.length > 10) transcript.summaries.shift(); + } + + private cacheMemoryContext(loopId: string, goal: string): void { + if (!this.memoryContextBuilder) return; + this.memoryContextBuilder + .build(goal) + .then((ctx) => { + if (ctx) this.memoryCache.set(loopId, ctx); + }) + .catch((e: unknown) => { + console.warn(`[loop] Memory context failed for ${loopId}: ${e instanceof Error ? e.message : e}`); + }); } } diff --git a/src/loop/store.ts b/src/loop/store.ts index 9f53000..1824605 100644 --- a/src/loop/store.ts +++ b/src/loop/store.ts @@ -9,6 +9,7 @@ export type LoopInsertInput = { successCommand: string | null; maxIterations: number; maxCostUsd: number; + checkpointInterval?: number | null; channelId: string | null; conversationId: string | null; triggerMessageTs: string | null; @@ -23,8 +24,8 @@ export class LoopStore { insert(input: LoopInsertInput): Loop { this.db.run( - `INSERT INTO loops (id, goal, workspace_dir, state_file, success_command, max_iterations, max_cost_usd, status, channel_id, conversation_id, trigger_message_ts) - VALUES (?, ?, ?, ?, ?, ?, ?, 'running', ?, ?, ?)`, + `INSERT INTO loops (id, goal, workspace_dir, state_file, success_command, max_iterations, max_cost_usd, checkpoint_interval, status, channel_id, conversation_id, trigger_message_ts) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'running', ?, ?, ?)`, [ input.id, input.goal, @@ -33,6 +34,7 @@ export class LoopStore { input.successCommand, input.maxIterations, input.maxCostUsd, + input.checkpointInterval ?? null, input.channelId, input.conversationId, input.triggerMessageTs, diff --git a/src/loop/tool.ts b/src/loop/tool.ts index 9c86437..d404cfd 100644 --- a/src/loop/tool.ts +++ b/src/loop/tool.ts @@ -55,6 +55,7 @@ ACTIONS: workspace (defaults to data/loops//), max_iterations (default 20, hard ceiling 200), max_cost_usd (default 5, hard ceiling 50), + checkpoint_interval (run a Sonnet critique every N ticks, 0 or omitted = off), success_command (shell command run after each tick; exit 0 = goal achieved. Runs under bash -c with a 5 minute timeout in a sanitized env containing only PATH, HOME, LANG, TERM, loop_id, and workspace), @@ -72,6 +73,13 @@ regression". Each iteration is fresh - all context must live in the state file.` workspace: z.string().optional(), max_iterations: z.number().int().positive().max(200).optional(), max_cost_usd: z.number().positive().max(50).optional(), + checkpoint_interval: z + .number() + .int() + .min(0) + .max(200) + .optional() + .describe("Run a Sonnet review every N ticks. 0 or omitted = no critique."), success_command: z.string().optional(), channel_id: z.string().optional(), conversation_id: z.string().optional(), @@ -93,6 +101,7 @@ regression". Each iteration is fresh - all context must live in the state file.` workspace: input.workspace, maxIterations: input.max_iterations, maxCostUsd: input.max_cost_usd, + checkpointInterval: input.checkpoint_interval, successCommand: input.success_command, channelId: input.channel_id ?? ctx?.slackChannelId, conversationId: input.conversation_id ?? ctx?.slackThreadTs, diff --git a/src/loop/types.ts b/src/loop/types.ts index d30249e..207fa87 100644 --- a/src/loop/types.ts +++ b/src/loop/types.ts @@ -10,6 +10,7 @@ export type Loop = { successCommand: string | null; maxIterations: number; maxCostUsd: number; + checkpointInterval: number | null; status: LoopStatus; iterationCount: number; totalCostUsd: number; @@ -32,6 +33,7 @@ export type LoopRow = { success_command: string | null; max_iterations: number; max_cost_usd: number; + checkpoint_interval: number | null; status: string; iteration_count: number; total_cost_usd: number; @@ -57,6 +59,7 @@ export type LoopStartInput = { workspace?: string; maxIterations?: number; maxCostUsd?: number; + checkpointInterval?: number; successCommand?: string; channelId?: string; conversationId?: string; @@ -74,6 +77,7 @@ export const LoopStartInputSchema = z.object({ workspace: z.string().optional(), max_iterations: z.number().int().positive().max(LOOP_MAX_ITERATIONS_CEILING).optional(), max_cost_usd: z.number().positive().max(LOOP_MAX_COST_CEILING_USD).optional(), + checkpoint_interval: z.number().int().min(0).max(LOOP_MAX_ITERATIONS_CEILING).optional(), success_command: z.string().optional(), channel_id: z.string().optional(), conversation_id: z.string().optional(), @@ -93,6 +97,7 @@ export function rowToLoop(row: LoopRow): Loop { successCommand: row.success_command, maxIterations: row.max_iterations, maxCostUsd: row.max_cost_usd, + checkpointInterval: row.checkpoint_interval, status: row.status as LoopStatus, iterationCount: row.iteration_count, totalCostUsd: row.total_cost_usd, diff --git a/src/memory/consolidation.ts b/src/memory/consolidation.ts index 35868d7..b53ac13 100644 --- a/src/memory/consolidation.ts +++ b/src/memory/consolidation.ts @@ -70,7 +70,7 @@ export async function consolidateSessionWithLLM( } } -function sessionDataToSummary(data: SessionData): SessionSummary { +export function sessionDataToSummary(data: SessionData): SessionSummary { return { session_id: data.sessionId, session_key: data.sessionKey, From 95b71acd1ec77d627dd5815558cf453bfeb8344c Mon Sep 17 00:00:00 2001 From: electronicBlacksmith Date: Mon, 6 Apr 2026 01:38:23 +0000 Subject: [PATCH 5/6] fix(loop): address code review findings from PR #9 - Decouple postLoopDeps so evolution and memory run independently (evolution works when memory is down and vice versa) - Skip mid-loop critique on terminal ticks to avoid wasted Sonnet calls - Track judge cost on failure paths via JudgeParseError carrying usage data - Extract recordTranscript/clamp from runner.ts to post-loop.ts (292 < 300 lines) --- src/evolution/judges/client.ts | 26 +++++++++++-- src/index.ts | 10 +++-- src/loop/post-loop.ts | 68 +++++++++++++++++++++++++--------- src/loop/runner.ts | 53 ++++++++------------------ src/memory/consolidation.ts | 13 ++++++- 5 files changed, 106 insertions(+), 64 deletions(-) diff --git a/src/evolution/judges/client.ts b/src/evolution/judges/client.ts index 6254e99..2a5e331 100644 --- a/src/evolution/judges/client.ts +++ b/src/evolution/judges/client.ts @@ -10,6 +10,19 @@ import { type VotingStrategy, } from "./types.ts"; +/** Thrown when the API call succeeds but structured output parsing fails. Carries token usage so cost can still be tracked. */ +export class JudgeParseError extends Error { + constructor( + message: string, + public readonly inputTokens: number, + public readonly outputTokens: number, + public readonly costUsd: number, + ) { + super(message); + this.name = "JudgeParseError"; + } +} + let _client: Anthropic | null = null; function getClient(): Anthropic { @@ -58,14 +71,19 @@ export async function callJudge(options: { }); const parsed = message.parsed_output; - if (!parsed) { - throw new Error(`Judge returned no structured output (stop_reason: ${message.stop_reason})`); - } - const inputTokens = message.usage.input_tokens; const outputTokens = message.usage.output_tokens; const costUsd = estimateCost(options.model, inputTokens, outputTokens); + if (!parsed) { + throw new JudgeParseError( + `Judge returned no structured output (stop_reason: ${message.stop_reason})`, + inputTokens, + outputTokens, + costUsd, + ); + } + // Extract verdict and confidence from the parsed data if present const data = parsed as Record; const verdict = (data.verdict as "pass" | "fail") ?? "pass"; diff --git a/src/index.ts b/src/index.ts index 7cf4d68..8315421 100644 --- a/src/index.ts +++ b/src/index.ts @@ -162,11 +162,13 @@ async function main(): Promise { let mcpServer: PhantomMcpServer | null = null; let scheduler: Scheduler | null = null; const postLoopDeps = - evolution && memory.isReady() + evolution || memory.isReady() ? { - evolution, - memory, - onEvolvedConfigUpdate: (config: ReturnType) => runtime.setEvolvedConfig(config), + evolution: evolution ?? undefined, + memory: memory.isReady() ? memory : undefined, + onEvolvedConfigUpdate: evolution + ? (config: ReturnType) => runtime.setEvolvedConfig(config) + : undefined, } : undefined; const loopRunner = new LoopRunner({ db, runtime, memoryContextBuilder: contextBuilder, postLoopDeps }); diff --git a/src/loop/post-loop.ts b/src/loop/post-loop.ts index e9f9bc6..093d831 100644 --- a/src/loop/post-loop.ts +++ b/src/loop/post-loop.ts @@ -12,10 +12,10 @@ export type LoopTranscript = { }; export type PostLoopDeps = { - evolution: EvolutionEngine; - memory: MemorySystem; + evolution?: EvolutionEngine; + memory?: MemorySystem; /** Callback to update runtime's evolved config after evolution applies changes. */ - onEvolvedConfigUpdate: (config: ReturnType) => void; + onEvolvedConfigUpdate?: (config: ReturnType) => void; }; function loopStatusToOutcome(status: LoopStatus): SessionData["outcome"] { @@ -29,6 +29,39 @@ function loopStatusToOutcome(status: LoopStatus): SessionData["outcome"] { } } +const MAX_ROLLING_SUMMARIES = 10; + +export function recordTranscript( + transcripts: Map, + loopId: string, + iteration: number, + prompt: string, + response: string, + stateStatus: string | undefined, +): void { + let transcript = transcripts.get(loopId); + if (!transcript) { + transcript = { + firstPrompt: prompt, + firstResponse: response, + summaries: [], + lastPrompt: prompt, + lastResponse: response, + }; + transcripts.set(loopId, transcript); + } else { + transcript.lastPrompt = prompt; + transcript.lastResponse = response; + } + const summary = `Tick ${iteration}: ${stateStatus ?? "in-progress"}`; + transcript.summaries.push(summary); + if (transcript.summaries.length > MAX_ROLLING_SUMMARIES) transcript.summaries.shift(); +} + +export function clamp(value: number, min: number, max: number): number { + return Math.min(Math.max(value, min), max); +} + export function synthesizeSessionData(loop: Loop, status: LoopStatus, transcript: LoopTranscript): SessionData { const outcome = loopStatusToOutcome(status); const header = `[Loop: ${loop.iterationCount} ticks, goal: ${loop.goal.slice(0, 200)}, outcome: ${outcome}]`; @@ -70,24 +103,25 @@ export async function runPostLoopPipeline(deps: PostLoopDeps, sessionData: Sessi "../memory/consolidation.ts" ); - const summary = sessionDataToSummary(sessionData); - - // Evolution pipeline (same pattern as src/index.ts:560-589) - try { - const result = await evolution.afterSession(summary); - if (result.changes_applied.length > 0) { - onEvolvedConfigUpdate(evolution.getConfig()); + // Evolution pipeline - runs independently of memory state + if (evolution) { + const summary = sessionDataToSummary(sessionData); + try { + const result = await evolution.afterSession(summary); + if (result.changes_applied.length > 0 && onEvolvedConfigUpdate) { + onEvolvedConfigUpdate(evolution.getConfig()); + } + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + console.warn(`[loop] Post-loop evolution failed: ${msg}`); } - } catch (err: unknown) { - const msg = err instanceof Error ? err.message : String(err); - console.warn(`[loop] Post-loop evolution failed: ${msg}`); } - // Memory consolidation - respects cost cap (matches interactive session pattern) - if (!memory.isReady()) return; + // Memory consolidation - runs independently of evolution state + if (!memory?.isReady()) return; try { - const useLLM = evolution.usesLLMJudges() && evolution.isWithinCostCap(); - if (useLLM) { + const useLLM = evolution?.usesLLMJudges() && evolution?.isWithinCostCap(); + if (useLLM && evolution) { const evolvedConfig = evolution.getConfig(); const existingFacts = `${evolvedConfig.userProfile}\n${evolvedConfig.domainKnowledge}`; const { result, judgeCost } = await consolidateSessionWithLLM(memory, sessionData, existingFacts); diff --git a/src/loop/runner.ts b/src/loop/runner.ts index 2540d27..0f40bb7 100644 --- a/src/loop/runner.ts +++ b/src/loop/runner.ts @@ -7,7 +7,14 @@ import { buildSafeEnv } from "../mcp/dynamic-handlers.ts"; import type { MemoryContextBuilder } from "../memory/context-builder.ts"; import { runCritiqueJudge } from "./critique.ts"; import { LoopNotifier } from "./notifications.ts"; -import { type LoopTranscript, type PostLoopDeps, runPostLoopPipeline, synthesizeSessionData } from "./post-loop.ts"; +import { + type LoopTranscript, + type PostLoopDeps, + clamp, + recordTranscript, + runPostLoopPipeline, + synthesizeSessionData, +} from "./post-loop.ts"; import { buildTickPrompt } from "./prompt.ts"; import { initStateFile, parseFrontmatter, readStateFile } from "./state-file.ts"; import { LoopStore } from "./store.ts"; @@ -186,13 +193,7 @@ export class LoopRunner { const frontmatter = parseFrontmatter(updatedContents); // Track bounded transcript for post-loop evolution - this.recordTranscript(id, nextIteration, prompt, response.text, frontmatter?.status); - - // Mid-loop critique checkpoint (Sonnet reviewing Opus mid-flight). - // Awaited so the critique is available before the next tick runs. - if (loop.checkpointInterval && loop.checkpointInterval > 0 && nextIteration % loop.checkpointInterval === 0) { - await this.runCritique(id, loop, updatedContents, nextIteration); - } + recordTranscript(this.transcripts, id, nextIteration, prompt, response.text, frontmatter?.status); if (frontmatter?.status === "done") { this.finalize(id, "done", null); @@ -207,6 +208,12 @@ export class LoopRunner { } } + // Mid-loop critique checkpoint (Sonnet reviewing Opus mid-flight). + // Runs after terminal checks so we don't waste a judge call on the final tick. + if (loop.checkpointInterval && loop.checkpointInterval > 0 && nextIteration % loop.checkpointInterval === 0) { + await this.runCritique(id, loop, updatedContents, nextIteration); + } + // Await tick update so its Slack write finishes before the next tick try { await this.notifier.postTickUpdate(id, nextIteration, frontmatter?.status ?? "in-progress"); @@ -271,32 +278,6 @@ export class LoopRunner { } } - private recordTranscript( - loopId: string, - iteration: number, - prompt: string, - response: string, - stateStatus: string | undefined, - ): void { - let transcript = this.transcripts.get(loopId); - if (!transcript) { - transcript = { - firstPrompt: prompt, - firstResponse: response, - summaries: [], - lastPrompt: prompt, - lastResponse: response, - }; - this.transcripts.set(loopId, transcript); - } else { - transcript.lastPrompt = prompt; - transcript.lastResponse = response; - } - const summary = `Tick ${iteration}: ${stateStatus ?? "in-progress"}`; - transcript.summaries.push(summary); - if (transcript.summaries.length > 10) transcript.summaries.shift(); - } - private cacheMemoryContext(loopId: string, goal: string): void { if (!this.memoryContextBuilder) return; this.memoryContextBuilder @@ -309,7 +290,3 @@ export class LoopRunner { }); } } - -function clamp(value: number, min: number, max: number): number { - return Math.min(Math.max(value, min), max); -} diff --git a/src/memory/consolidation.ts b/src/memory/consolidation.ts index b53ac13..3cd58f2 100644 --- a/src/memory/consolidation.ts +++ b/src/memory/consolidation.ts @@ -1,3 +1,4 @@ +import { JudgeParseError } from "../evolution/judges/client.ts"; import { runConsolidationJudge } from "../evolution/judges/consolidation-judge.ts"; import type { JudgeCostEntry } from "../evolution/judges/types.ts"; import type { SessionSummary } from "../evolution/types.ts"; @@ -66,7 +67,17 @@ export async function consolidateSessionWithLLM( const msg = error instanceof Error ? error.message : String(error); console.warn(`[memory] Consolidation judge failed, falling back to heuristic: ${msg}`); const result = await consolidateSession(memory, sessionData); - return { result, judgeCost: null }; + // Track cost from successful API calls that failed parsing (tokens were consumed) + const judgeCost = + error instanceof JudgeParseError + ? { + calls: 1, + totalUsd: error.costUsd, + totalInputTokens: error.inputTokens, + totalOutputTokens: error.outputTokens, + } + : null; + return { result, judgeCost }; } } From 43cc5fc43585f736c66787907a150040fd992465 Mon Sep 17 00:00:00 2001 From: electronicBlacksmith Date: Mon, 6 Apr 2026 23:18:23 +0000 Subject: [PATCH 6/6] fix(test): stabilize trigger-auth tests for CI Wire setTriggerDeps before startServer so the handler is ready on the first request. Use server.url.origin instead of manually building the URL from server.port which can race in CI. Add a health check fetch to confirm the server is accepting connections before tests run. --- src/core/__tests__/trigger-auth.test.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/core/__tests__/trigger-auth.test.ts b/src/core/__tests__/trigger-auth.test.ts index 39f89ff..c3db785 100644 --- a/src/core/__tests__/trigger-auth.test.ts +++ b/src/core/__tests__/trigger-auth.test.ts @@ -19,7 +19,7 @@ describe("/trigger endpoint auth", () => { let server: ReturnType; let baseUrl: string; - beforeAll(() => { + beforeAll(async () => { // Back up the existing mcp.yaml so we can restore it after tests if (existsSync(mcpConfigPath)) { originalMcpYaml = readFileSync(mcpConfigPath, "utf-8"); @@ -38,11 +38,8 @@ describe("/trigger endpoint auth", () => { mkdirSync("config", { recursive: true }); writeFileSync(mcpConfigPath, YAML.stringify(mcpConfig), "utf-8"); - // Start server with a random port - server = startServer({ name: "test", port: 0, role: "base" } as never, Date.now()); - baseUrl = `http://localhost:${server.port}`; - - // Wire trigger deps with a mock runtime + // Wire trigger deps before starting the server so the /trigger + // handler is ready on the first request. setTriggerDeps({ runtime: { handleMessage: async () => ({ @@ -52,6 +49,15 @@ describe("/trigger endpoint auth", () => { }), } as never, }); + + // Start server after deps are wired. Use server.url (Bun guarantees + // it is populated once serve() returns) instead of manually building + // the URL from server.port, which can race in CI environments. + server = startServer({ name: "test", port: 0, role: "base" } as never, Date.now()); + baseUrl = server.url.origin; + + // Ensure the server is accepting connections before tests run. + await fetch(`${baseUrl}/health`); }); afterAll(() => {