From ccb450636c0c766d693915a8b1a52029c3c953bf Mon Sep 17 00:00:00 2001 From: Serge Alex Serbinenko <129629680+SergeSerb2@users.noreply.github.com> Date: Fri, 3 Jul 2026 22:58:49 -0700 Subject: [PATCH 1/5] ci: require Codex review before merge via status-check gate Adds a codex-review-required check that fails until the Codex bot has submitted a review on the PR. Re-runs on pull_request_review so it flips green as soon as Codex reviews. Enforced via branch ruleset. Co-Authored-By: Claude Fable 5 --- .github/workflows/codex-review-gate.yml | 30 +++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 .github/workflows/codex-review-gate.yml diff --git a/.github/workflows/codex-review-gate.yml b/.github/workflows/codex-review-gate.yml new file mode 100644 index 00000000000..77b75a918f7 --- /dev/null +++ b/.github/workflows/codex-review-gate.yml @@ -0,0 +1,30 @@ +name: Codex review gate + +on: + pull_request: + types: [opened, synchronize, reopened, ready_for_review] + pull_request_review: + types: [submitted, edited, dismissed] + +permissions: + pull-requests: read + +jobs: + codex-review-required: + name: codex-review-required + runs-on: ubuntu-latest + steps: + - name: Require a Codex review on this PR + env: + GH_TOKEN: ${{ github.token }} + REPO: ${{ github.repository }} + PR: ${{ github.event.pull_request.number }} + run: | + count=$(gh api "repos/$REPO/pulls/$PR/reviews" --paginate \ + --jq '[.[] | select((.user.type == "Bot") and (.user.login | ascii_downcase | contains("codex")))] | length') + if [ "$count" -ge 1 ]; then + echo "Codex has reviewed this PR ($count review(s))." + else + echo "::error::No Codex review found on this PR yet. This check re-runs automatically when Codex submits its review." + exit 1 + fi From 089b1da974fdd415e09274c2fe3dad0d1bb65775 Mon Sep 17 00:00:00 2001 From: Serge Alex Serbinenko <129629680+SergeSerb2@users.noreply.github.com> Date: Fri, 3 Jul 2026 23:06:53 -0700 Subject: [PATCH 2/5] ci: address Codex review findings on review gate Require the Codex review to target the current head SHA so stale reviews no longer satisfy the gate after new pushes, aggregate paginated review pages before counting, and ignore dismissed reviews. Co-Authored-By: Claude Fable 5 --- .github/workflows/codex-review-gate.yml | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/.github/workflows/codex-review-gate.yml b/.github/workflows/codex-review-gate.yml index 77b75a918f7..b9d1b3525c4 100644 --- a/.github/workflows/codex-review-gate.yml +++ b/.github/workflows/codex-review-gate.yml @@ -14,17 +14,23 @@ jobs: name: codex-review-required runs-on: ubuntu-latest steps: - - name: Require a Codex review on this PR + - name: Require a Codex review of the current head commit env: GH_TOKEN: ${{ github.token }} REPO: ${{ github.repository }} PR: ${{ github.event.pull_request.number }} + HEAD_SHA: ${{ github.event.pull_request.head.sha }} run: | count=$(gh api "repos/$REPO/pulls/$PR/reviews" --paginate \ - --jq '[.[] | select((.user.type == "Bot") and (.user.login | ascii_downcase | contains("codex")))] | length') + | jq -s --arg head "$HEAD_SHA" '[add[] | select( + (.user.type == "Bot") + and (.user.login | ascii_downcase | contains("codex")) + and (.state != "DISMISSED") + and (.commit_id == $head) + )] | length') if [ "$count" -ge 1 ]; then - echo "Codex has reviewed this PR ($count review(s))." + echo "Codex has reviewed head commit $HEAD_SHA ($count review(s))." else - echo "::error::No Codex review found on this PR yet. This check re-runs automatically when Codex submits its review." + echo "::error::No Codex review of head commit $HEAD_SHA yet. This check re-runs automatically when Codex submits its review; comment '@codex review' to request one." exit 1 fi From 18a160e98b6294a5cb4a3435c3e2c9023e8effd3 Mon Sep 17 00:00:00 2001 From: Serge Alex Serbinenko <129629680+SergeSerb2@users.noreply.github.com> Date: Fri, 3 Jul 2026 23:21:37 -0700 Subject: [PATCH 3/5] ci: detect clean Codex reviews and poll instead of instant-failing Codex posts 'no major issues' verdicts as issue comments carrying 'Reviewed commit: ', not as PR reviews, so the gate now accepts either form for the current head. The check also polls for up to 20 minutes and requests a review via '@codex review' (once per head) rather than failing immediately and relying on event retriggers. Co-Authored-By: Claude Fable 5 --- .github/workflows/codex-review-gate.yml | 53 +++++++++++++++++++++---- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/.github/workflows/codex-review-gate.yml b/.github/workflows/codex-review-gate.yml index b9d1b3525c4..f2406dd62be 100644 --- a/.github/workflows/codex-review-gate.yml +++ b/.github/workflows/codex-review-gate.yml @@ -7,12 +7,14 @@ on: types: [submitted, edited, dismissed] permissions: - pull-requests: read + pull-requests: write + issues: write jobs: codex-review-required: name: codex-review-required runs-on: ubuntu-latest + timeout-minutes: 25 steps: - name: Require a Codex review of the current head commit env: @@ -21,16 +23,51 @@ jobs: PR: ${{ github.event.pull_request.number }} HEAD_SHA: ${{ github.event.pull_request.head.sha }} run: | - count=$(gh api "repos/$REPO/pulls/$PR/reviews" --paginate \ - | jq -s --arg head "$HEAD_SHA" '[add[] | select( + codex_reviewed() { + local n + n=$(gh api "repos/$REPO/pulls/$PR/reviews" --paginate | jq -s --arg head "$HEAD_SHA" ' + [add[] | select( (.user.type == "Bot") and (.user.login | ascii_downcase | contains("codex")) and (.state != "DISMISSED") and (.commit_id == $head) )] | length') - if [ "$count" -ge 1 ]; then - echo "Codex has reviewed head commit $HEAD_SHA ($count review(s))." - else - echo "::error::No Codex review of head commit $HEAD_SHA yet. This check re-runs automatically when Codex submits its review; comment '@codex review' to request one." - exit 1 + [ "$n" -ge 1 ] && return 0 + # A clean Codex review ("no major issues") arrives as an issue + # comment carrying "Reviewed commit: ``", not as a PR review. + n=$(gh api "repos/$REPO/issues/$PR/comments" --paginate | jq -s --arg head "$HEAD_SHA" ' + [add[] + | select((.user.type == "Bot") and (.user.login | ascii_downcase | contains("codex"))) + | (.body | capture("Reviewed commit:[^`]*`(?[0-9a-f]{7,40})`") // empty) + | select(.sha as $s | $head | startswith($s)) + ] | length') + [ "$n" -ge 1 ] + } + + request_review_once() { + local marker existing body + marker="" + existing=$(gh api "repos/$REPO/issues/$PR/comments" --paginate | jq -s --arg m "$marker" \ + '[add[] | select(.body | contains($m))] | length') + if [ "$existing" -eq 0 ]; then + body=$(printf '@codex review\n\n%s' "$marker") + gh api "repos/$REPO/issues/$PR/comments" -f body="$body" >/dev/null + echo "Requested a Codex review of $HEAD_SHA." + fi + } + + if codex_reviewed; then + echo "Codex has reviewed head commit $HEAD_SHA." + exit 0 fi + request_review_once + for i in $(seq 1 40); do + sleep 30 + if codex_reviewed; then + echo "Codex has reviewed head commit $HEAD_SHA." + exit 0 + fi + echo "Waiting for Codex review of $HEAD_SHA ($i/40)..." + done + echo "::error::No Codex review of head commit $HEAD_SHA after 20 minutes. Comment '@codex review' on the PR, then re-run this check." + exit 1 From b6c0e0669836722c6c47a094ec22023151164377 Mon Sep 17 00:00:00 2001 From: Serge Alex Serbinenko <129629680+SergeSerb2@users.noreply.github.com> Date: Fri, 3 Jul 2026 23:37:03 -0700 Subject: [PATCH 4/5] ci: relax gate to one Codex review per PR Per-head re-review requirements and automated '@codex review' requests burn ChatGPT usage limits. The gate now passes once Codex has responded to the PR at all (review or clean-verdict comment); re-reviews after new pushes are handled manually by the maintainer. Co-Authored-By: Claude Fable 5 --- .github/workflows/codex-review-gate.yml | 52 ++++++++----------------- 1 file changed, 16 insertions(+), 36 deletions(-) diff --git a/.github/workflows/codex-review-gate.yml b/.github/workflows/codex-review-gate.yml index f2406dd62be..cf34b307cdc 100644 --- a/.github/workflows/codex-review-gate.yml +++ b/.github/workflows/codex-review-gate.yml @@ -4,70 +4,50 @@ on: pull_request: types: [opened, synchronize, reopened, ready_for_review] pull_request_review: - types: [submitted, edited, dismissed] + types: [submitted] permissions: - pull-requests: write - issues: write + pull-requests: read jobs: codex-review-required: name: codex-review-required runs-on: ubuntu-latest - timeout-minutes: 25 + timeout-minutes: 20 steps: - - name: Require a Codex review of the current head commit + - name: Require one Codex review on this PR env: GH_TOKEN: ${{ github.token }} REPO: ${{ github.repository }} PR: ${{ github.event.pull_request.number }} - HEAD_SHA: ${{ github.event.pull_request.head.sha }} run: | codex_reviewed() { local n - n=$(gh api "repos/$REPO/pulls/$PR/reviews" --paginate | jq -s --arg head "$HEAD_SHA" ' + n=$(gh api "repos/$REPO/pulls/$PR/reviews" --paginate | jq -s ' [add[] | select( (.user.type == "Bot") and (.user.login | ascii_downcase | contains("codex")) and (.state != "DISMISSED") - and (.commit_id == $head) )] | length') [ "$n" -ge 1 ] && return 0 # A clean Codex review ("no major issues") arrives as an issue # comment carrying "Reviewed commit: ``", not as a PR review. - n=$(gh api "repos/$REPO/issues/$PR/comments" --paginate | jq -s --arg head "$HEAD_SHA" ' - [add[] - | select((.user.type == "Bot") and (.user.login | ascii_downcase | contains("codex"))) - | (.body | capture("Reviewed commit:[^`]*`(?[0-9a-f]{7,40})`") // empty) - | select(.sha as $s | $head | startswith($s)) - ] | length') + n=$(gh api "repos/$REPO/issues/$PR/comments" --paginate | jq -s ' + [add[] | select( + (.user.type == "Bot") + and (.user.login | ascii_downcase | contains("codex")) + and (.body | test("Reviewed commit:")) + )] | length') [ "$n" -ge 1 ] } - request_review_once() { - local marker existing body - marker="" - existing=$(gh api "repos/$REPO/issues/$PR/comments" --paginate | jq -s --arg m "$marker" \ - '[add[] | select(.body | contains($m))] | length') - if [ "$existing" -eq 0 ]; then - body=$(printf '@codex review\n\n%s' "$marker") - gh api "repos/$REPO/issues/$PR/comments" -f body="$body" >/dev/null - echo "Requested a Codex review of $HEAD_SHA." - fi - } - - if codex_reviewed; then - echo "Codex has reviewed head commit $HEAD_SHA." - exit 0 - fi - request_review_once - for i in $(seq 1 40); do - sleep 30 + for i in $(seq 1 30); do if codex_reviewed; then - echo "Codex has reviewed head commit $HEAD_SHA." + echo "Codex has reviewed this PR." exit 0 fi - echo "Waiting for Codex review of $HEAD_SHA ($i/40)..." + echo "Waiting for Codex review ($i/30)..." + sleep 30 done - echo "::error::No Codex review of head commit $HEAD_SHA after 20 minutes. Comment '@codex review' on the PR, then re-run this check." + echo "::error::No Codex review on this PR after 15 minutes. Comment '@codex review' on the PR, then re-run this check." exit 1 From b16c18b2df7bc0d115d17d4b146485e03366597e Mon Sep 17 00:00:00 2001 From: Serge Alex Serbinenko <129629680+SergeSerb2@users.noreply.github.com> Date: Fri, 3 Jul 2026 23:51:19 -0700 Subject: [PATCH 5/5] Add cross-provider sub-agent spawning via product MCP toolkit Any provider session (Claude, Codex, Cursor, Grok, OpenCode) can now spawn and drive sub-agent threads on any other configured provider instance through four new MCP tools on the product MCP server: - agent_list: connected provider instances, readiness, model slugs - agent_spawn: create a thread on the target instance in the caller's project/worktree and send the initial prompt (returns immediately) - agent_wait: block up to timeoutSeconds for turn completion and return the final assistant text; "running" on timeout so callers re-poll instead of tripping MCP client timeouts - agent_send: follow-up prompts to a spawned thread Spawned threads go through the orchestration engine, so they persist, appear in the UI, and keep the normal approval/checkpoint flow. The SubAgentCoordinator bounds nesting at depth 2, restricts send/wait to the spawning session, and validates target readiness. Tools are gated behind a new "agents" MCP capability issued alongside "preview". Parent/child bookkeeping is in-memory only for now: after a server restart spawned threads survive as ordinary threads but can no longer be driven via agent_send/agent_wait. Co-Authored-By: Claude Fable 5 --- apps/server/src/mcp/McpHttpServer.ts | 11 +- apps/server/src/mcp/McpInvocationContext.ts | 2 +- apps/server/src/mcp/McpSessionRegistry.ts | 2 +- .../agents/SubAgentCoordinator.test.ts | 375 ++++++++++++++++++ .../toolkits/agents/SubAgentCoordinator.ts | 348 ++++++++++++++++ .../src/mcp/toolkits/agents/handlers.ts | 49 +++ apps/server/src/mcp/toolkits/agents/tools.ts | 73 ++++ packages/contracts/src/index.ts | 1 + packages/contracts/src/previewAutomation.ts | 2 +- packages/contracts/src/subAgents.ts | 135 +++++++ 10 files changed, 994 insertions(+), 4 deletions(-) create mode 100644 apps/server/src/mcp/toolkits/agents/SubAgentCoordinator.test.ts create mode 100644 apps/server/src/mcp/toolkits/agents/SubAgentCoordinator.ts create mode 100644 apps/server/src/mcp/toolkits/agents/handlers.ts create mode 100644 apps/server/src/mcp/toolkits/agents/tools.ts create mode 100644 packages/contracts/src/subAgents.ts diff --git a/apps/server/src/mcp/McpHttpServer.ts b/apps/server/src/mcp/McpHttpServer.ts index 6774731a73e..d3172ffc98c 100644 --- a/apps/server/src/mcp/McpHttpServer.ts +++ b/apps/server/src/mcp/McpHttpServer.ts @@ -22,6 +22,8 @@ import { PreviewSnapshotToolkit, PreviewStandardToolkit, } from "./toolkits/preview/tools.ts"; +import { SubAgentToolkitHandlersLive } from "./toolkits/agents/handlers.ts"; +import { SubAgentToolkit } from "./toolkits/agents/tools.ts"; const unauthorized = HttpServerResponse.jsonUnsafe( { @@ -208,10 +210,17 @@ export const PreviewToolkitRegistrationLive = Layer.mergeAll( PreviewSnapshotRegistrationLive, ); +export const SubAgentToolkitRegistrationLive = McpServer.toolkit(SubAgentToolkit).pipe( + Layer.provide(SubAgentToolkitHandlersLive), +); + const McpTransportLive = McpServer.layerHttp({ name: "T3 Code", version: packageJson.version, path: "/mcp", }).pipe(Layer.provide(McpAuthMiddlewareLive)); -export const layer = PreviewToolkitRegistrationLive.pipe(Layer.provideMerge(McpTransportLive)); +export const layer = Layer.mergeAll( + PreviewToolkitRegistrationLive, + SubAgentToolkitRegistrationLive, +).pipe(Layer.provideMerge(McpTransportLive)); diff --git a/apps/server/src/mcp/McpInvocationContext.ts b/apps/server/src/mcp/McpInvocationContext.ts index b13bf2d312e..20f5191d7a6 100644 --- a/apps/server/src/mcp/McpInvocationContext.ts +++ b/apps/server/src/mcp/McpInvocationContext.ts @@ -7,7 +7,7 @@ import { import * as Context from "effect/Context"; import * as Effect from "effect/Effect"; -export type McpCapability = "preview"; +export type McpCapability = "preview" | "agents"; export interface McpInvocationScope { readonly environmentId: EnvironmentId; diff --git a/apps/server/src/mcp/McpSessionRegistry.ts b/apps/server/src/mcp/McpSessionRegistry.ts index 67c4f2f0ff0..37404708436 100644 --- a/apps/server/src/mcp/McpSessionRegistry.ts +++ b/apps/server/src/mcp/McpSessionRegistry.ts @@ -114,7 +114,7 @@ const makeWithOptions = Effect.fn("McpSessionRegistry.make")(function* ( threadId: ThreadId.make(request.threadId), providerSessionId, providerInstanceId: ProviderInstanceId.make(request.providerInstanceId), - capabilities: new Set(["preview"]), + capabilities: new Set(["preview", "agents"]), issuedAt, expiresAt, }; diff --git a/apps/server/src/mcp/toolkits/agents/SubAgentCoordinator.test.ts b/apps/server/src/mcp/toolkits/agents/SubAgentCoordinator.test.ts new file mode 100644 index 00000000000..b3b99d8f238 --- /dev/null +++ b/apps/server/src/mcp/toolkits/agents/SubAgentCoordinator.test.ts @@ -0,0 +1,375 @@ +import * as NodeServices from "@effect/platform-node/NodeServices"; +import { expect, it } from "@effect/vitest"; +import { + EnvironmentId, + MessageId, + ProviderDriverKind, + ProviderInstanceId, + ThreadId, + TurnId, + type OrchestrationCommand, + type OrchestrationThread, + type OrchestrationThreadShell, + type ServerProvider, + type SubAgentError, +} from "@t3tools/contracts"; +import { createModelCapabilities } from "@t3tools/shared/model"; +import * as Duration from "effect/Duration"; +import * as Effect from "effect/Effect"; +import * as Fiber from "effect/Fiber"; +import * as Option from "effect/Option"; +import * as Stream from "effect/Stream"; +import * as TestClock from "effect/testing/TestClock"; + +import { OrchestrationEngineService } from "../../../orchestration/Services/OrchestrationEngine.ts"; +import { ProjectionSnapshotQuery } from "../../../orchestration/Services/ProjectionSnapshotQuery.ts"; +import { ProviderRegistry } from "../../../provider/Services/ProviderRegistry.ts"; +import type { McpInvocationScope } from "../../McpInvocationContext.ts"; +import { SubAgentCoordinator, __testing } from "./SubAgentCoordinator.ts"; + +const emptyCapabilities = createModelCapabilities({ optionDescriptors: [] }); + +const makeProvider = ( + instanceId: string, + driver: string, + overrides?: Partial, +): ServerProvider => ({ + instanceId: ProviderInstanceId.make(instanceId), + driver: ProviderDriverKind.make(driver), + enabled: true, + installed: true, + version: "1.0.0", + status: "ready", + auth: { status: "authenticated" }, + checkedAt: "2026-04-11T00:00:00.000Z", + models: [ + { + slug: "default-model", + name: "Default Model", + isCustom: false, + capabilities: emptyCapabilities, + }, + ], + slashCommands: [], + skills: [], + ...overrides, +}); + +const parentThreadId = ThreadId.make("parent-thread"); +const projectId = "project-1"; + +const makeScope = (threadId: ThreadId = parentThreadId): McpInvocationScope => ({ + environmentId: EnvironmentId.make("environment-1"), + threadId, + providerSessionId: "provider-session-1", + providerInstanceId: ProviderInstanceId.make("claude"), + capabilities: new Set(["preview", "agents"]), + issuedAt: 0, + expiresAt: Number.MAX_SAFE_INTEGER, +}); + +const makeThreadShell = (threadId: ThreadId): OrchestrationThreadShell => + ({ + id: threadId, + projectId, + title: "Parent thread", + modelSelection: { instanceId: ProviderInstanceId.make("claude"), model: "opus" }, + runtimeMode: "approval-required", + interactionMode: "default", + branch: "feature/foo", + worktreePath: "/tmp/worktrees/foo", + latestTurn: null, + createdAt: "2026-04-11T00:00:00.000Z", + updatedAt: "2026-04-11T00:00:00.000Z", + archivedAt: null, + session: null, + latestUserMessageAt: null, + hasPendingApprovals: false, + hasPendingUserInput: false, + hasActionableProposedPlan: false, + }) as OrchestrationThreadShell; + +const makeThreadDetail = ( + threadId: ThreadId, + overrides?: Partial, +): OrchestrationThread => + ({ + id: threadId, + projectId, + title: "Sub-agent thread", + modelSelection: { instanceId: ProviderInstanceId.make("codex"), model: "default-model" }, + runtimeMode: "approval-required", + interactionMode: "default", + branch: null, + worktreePath: null, + latestTurn: null, + createdAt: "2026-04-11T00:00:00.000Z", + updatedAt: "2026-04-11T00:00:00.000Z", + archivedAt: null, + deletedAt: null, + messages: [], + proposedPlans: [], + activities: [], + checkpoints: [], + session: null, + ...overrides, + }) as OrchestrationThread; + +interface Harness { + readonly dispatched: Array; + readonly setThreadDetail: ( + lookup: (threadId: ThreadId) => Option.Option, + ) => void; +} + +const makeCoordinator = (options?: { + readonly providers?: ReadonlyArray; +}): Effect.Effect => { + const dispatched: Array = []; + let threadDetailLookup: (threadId: ThreadId) => Option.Option = () => + Option.none(); + + const engine = OrchestrationEngineService.of({ + readEvents: () => Stream.empty, + dispatch: (command) => + Effect.sync(() => { + dispatched.push(command); + return { sequence: dispatched.length }; + }), + streamDomainEvents: Stream.never, + }); + + const unused = () => Effect.die("unused in SubAgentCoordinator tests"); + const snapshotQuery = ProjectionSnapshotQuery.of({ + getCommandReadModel: unused, + getSnapshot: unused, + getShellSnapshot: unused, + getArchivedShellSnapshot: unused, + getSnapshotSequence: unused, + getCounts: unused, + getActiveProjectByWorkspaceRoot: unused, + getProjectShellById: unused, + getFirstActiveThreadIdByProjectId: unused, + getThreadCheckpointContext: unused, + getFullThreadDiffContext: unused, + getThreadShellById: (threadId) => Effect.succeed(Option.some(makeThreadShell(threadId))), + getThreadDetailById: (threadId) => Effect.sync(() => threadDetailLookup(threadId)), + }); + + const providers = options?.providers ?? [ + makeProvider("claude", "claudeAgent"), + makeProvider("codex", "codex"), + ]; + const providerRegistry = ProviderRegistry.of({ + getProviders: Effect.succeed(providers), + refresh: unused, + refreshInstance: unused, + getProviderMaintenanceCapabilitiesForInstance: unused, + setProviderMaintenanceActionState: unused, + streamChanges: Stream.never, + }); + + const harness: Harness = { + dispatched, + setThreadDetail: (lookup) => { + threadDetailLookup = lookup; + }, + }; + + return __testing.make.pipe( + Effect.provideService(OrchestrationEngineService, engine), + Effect.provideService(ProjectionSnapshotQuery, snapshotQuery), + Effect.provideService(ProviderRegistry, providerRegistry), + Effect.provide(NodeServices.layer), + Effect.map((coordinator) => [coordinator, harness] as const), + ); +}; + +const expectSubAgentError = (effect: Effect.Effect) => Effect.flip(effect); + +it.effect("spawns a sub-agent thread next to the caller's thread on another provider", () => + Effect.gen(function* () { + const [coordinator, harness] = yield* makeCoordinator(); + + const result = yield* coordinator.spawn(makeScope(), { + providerInstanceId: ProviderInstanceId.make("codex"), + prompt: "Review the auth module for bugs.", + }); + + expect(result.status).toBe("running"); + expect(result.providerInstanceId).toBe("codex"); + expect(result.model).toBe("default-model"); + expect(result.title).toBe("Review the auth module for bugs."); + + expect(harness.dispatched).toHaveLength(2); + const [create, turnStart] = harness.dispatched; + expect(create?.type).toBe("thread.create"); + if (create?.type === "thread.create") { + expect(create.threadId).toBe(result.threadId); + expect(create.projectId).toBe(projectId); + expect(create.worktreePath).toBe("/tmp/worktrees/foo"); + expect(create.branch).toBe("feature/foo"); + expect(create.runtimeMode).toBe("approval-required"); + expect(create.modelSelection).toEqual({ instanceId: "codex", model: "default-model" }); + } + expect(turnStart?.type).toBe("thread.turn.start"); + if (turnStart?.type === "thread.turn.start") { + expect(turnStart.threadId).toBe(result.threadId); + expect(turnStart.message.text).toBe("Review the auth module for bugs."); + expect(turnStart.runtimeMode).toBe("approval-required"); + } + }), +); + +it.effect("rejects spawn targets that are unknown or not ready", () => + Effect.gen(function* () { + const [coordinator] = yield* makeCoordinator({ + providers: [ + makeProvider("claude", "claudeAgent"), + makeProvider("codex", "codex", { status: "error", auth: { status: "unauthenticated" } }), + ], + }); + + const unknown = yield* expectSubAgentError( + coordinator.spawn(makeScope(), { + providerInstanceId: ProviderInstanceId.make("missing"), + prompt: "Do something.", + }), + ); + expect(unknown.reason).toBe("provider-not-found"); + + const notReady = yield* expectSubAgentError( + coordinator.spawn(makeScope(), { + providerInstanceId: ProviderInstanceId.make("codex"), + prompt: "Do something.", + }), + ); + expect(notReady.reason).toBe("provider-not-spawnable"); + }), +); + +it.effect("bounds recursive sub-agent nesting", () => + Effect.gen(function* () { + const [coordinator] = yield* makeCoordinator(); + + const first = yield* coordinator.spawn(makeScope(), { + providerInstanceId: ProviderInstanceId.make("codex"), + prompt: "Level one task.", + }); + + const childScope = makeScope(first.threadId); + const second = yield* coordinator + .spawn(childScope, { + providerInstanceId: ProviderInstanceId.make("claude"), + prompt: "Level two task.", + }) + .pipe( + Effect.catch((error) => + Effect.sync(() => { + // Depth 1 caller must still be allowed; only depth 2 is refused. + throw new Error(`unexpected refusal at depth 1: ${error.description}`); + }), + ), + ); + + const grandchildScope = makeScope(second.threadId); + const refused = yield* expectSubAgentError( + coordinator.spawn(grandchildScope, { + providerInstanceId: ProviderInstanceId.make("codex"), + prompt: "Level three task.", + }), + ); + expect(refused.reason).toBe("depth-limit-exceeded"); + }), +); + +it.effect("refuses to drive threads the calling session did not spawn", () => + Effect.gen(function* () { + const [coordinator] = yield* makeCoordinator(); + + const send = yield* expectSubAgentError( + coordinator.send(makeScope(), { threadId: ThreadId.make("foreign-thread"), prompt: "hi" }), + ); + expect(send.reason).toBe("thread-not-found"); + + const wait = yield* expectSubAgentError( + coordinator.wait(makeScope(), { threadId: ThreadId.make("foreign-thread") }), + ); + expect(wait.reason).toBe("thread-not-found"); + }), +); + +it.effect("waits for the spawned turn to complete and returns the assistant text", () => + Effect.gen(function* () { + const [coordinator, harness] = yield* makeCoordinator(); + + const spawned = yield* coordinator.spawn(makeScope(), { + providerInstanceId: ProviderInstanceId.make("codex"), + prompt: "Summarize the repo.", + }); + + const assistantMessageId = MessageId.make("assistant-message-1"); + const turnId = TurnId.make("turn-1"); + harness.setThreadDetail((threadId) => + threadId === spawned.threadId + ? Option.some( + makeThreadDetail(spawned.threadId, { + latestTurn: { + turnId, + state: "completed", + requestedAt: "9999-01-01T00:00:00.000Z", + startedAt: "9999-01-01T00:00:00.000Z", + completedAt: "9999-01-01T00:00:01.000Z", + assistantMessageId, + }, + messages: [ + { + id: assistantMessageId, + role: "assistant", + text: "The repo is a coding-agent GUI.", + turnId, + streaming: false, + createdAt: "9999-01-01T00:00:01.000Z", + updatedAt: "9999-01-01T00:00:01.000Z", + }, + ], + }), + ) + : Option.none(), + ); + + const result = yield* coordinator.wait(makeScope(), { + threadId: spawned.threadId, + timeoutSeconds: 5, + }); + expect(result.status).toBe("completed"); + expect(result.finalText).toBe("The repo is a coding-agent GUI."); + }), +); + +it.effect("reports running when the sub-agent has not finished before the timeout", () => + Effect.gen(function* () { + const [coordinator, harness] = yield* makeCoordinator(); + + const spawned = yield* coordinator.spawn(makeScope(), { + providerInstanceId: ProviderInstanceId.make("codex"), + prompt: "Long running task.", + }); + harness.setThreadDetail((threadId) => + threadId === spawned.threadId + ? Option.some(makeThreadDetail(spawned.threadId, { latestTurn: null })) + : Option.none(), + ); + + const waiting = yield* coordinator + .wait(makeScope(), { + threadId: spawned.threadId, + timeoutSeconds: 1, + }) + .pipe(Effect.forkChild); + yield* TestClock.adjust(Duration.seconds(2)); + const result = yield* Fiber.join(waiting); + expect(result.status).toBe("running"); + expect(result.finalText).toBeNull(); + }), +); diff --git a/apps/server/src/mcp/toolkits/agents/SubAgentCoordinator.ts b/apps/server/src/mcp/toolkits/agents/SubAgentCoordinator.ts new file mode 100644 index 00000000000..c9cb1afc743 --- /dev/null +++ b/apps/server/src/mcp/toolkits/agents/SubAgentCoordinator.ts @@ -0,0 +1,348 @@ +/** + * SubAgentCoordinator - Cross-provider sub-agent orchestration for MCP tools. + * + * Backs the `agent_*` MCP toolkit: a running provider session (Claude, + * Codex, Cursor, ...) can spawn a sibling thread on any configured provider + * instance, send follow-up prompts, and await turn completion. Spawned + * threads flow through the regular orchestration engine, so they persist + * and render in the UI like user-created threads. + * + * Parent/child bookkeeping is in-memory only: after a server restart, + * previously spawned threads survive as ordinary threads but can no longer + * be driven through `agent_send`/`agent_wait`. + */ +import { + CommandId, + isProviderAvailable, + MessageId, + SUB_AGENT_MAX_SPAWN_DEPTH, + SubAgentError, + ThreadId, + type OrchestrationThread, + type RuntimeMode, + type ServerProvider, + type SubAgentListResult, + type SubAgentSendInput, + type SubAgentSendResult, + type SubAgentSpawnInput, + type SubAgentSpawnResult, + type SubAgentStatus, + type SubAgentWaitInput, + type SubAgentWaitResult, +} from "@t3tools/contracts"; +import * as Clock from "effect/Clock"; +import * as Context from "effect/Context"; +import * as Crypto from "effect/Crypto"; +import * as DateTime from "effect/DateTime"; +import * as Duration from "effect/Duration"; +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; +import * as Option from "effect/Option"; +import * as SynchronizedRef from "effect/SynchronizedRef"; + +import { OrchestrationEngineService } from "../../../orchestration/Services/OrchestrationEngine.ts"; +import { ProjectionSnapshotQuery } from "../../../orchestration/Services/ProjectionSnapshotQuery.ts"; +import { ProviderRegistry } from "../../../provider/Services/ProviderRegistry.ts"; +import type { McpInvocationScope } from "../../McpInvocationContext.ts"; + +const WAIT_POLL_INTERVAL_MILLIS = 500; +const DEFAULT_WAIT_TIMEOUT_SECONDS = 60; +const DEFAULT_TITLE_MAX_LENGTH = 60; + +interface SubAgentRecord { + readonly parentThreadId: ThreadId; + readonly depth: number; + /** `createdAt` of the most recent turn-start command sent to this child. */ + readonly lastTurnRequestedAt: string; +} + +export interface SubAgentCoordinatorShape { + readonly list: (scope: McpInvocationScope) => Effect.Effect; + readonly spawn: ( + scope: McpInvocationScope, + input: SubAgentSpawnInput, + ) => Effect.Effect; + readonly send: ( + scope: McpInvocationScope, + input: SubAgentSendInput, + ) => Effect.Effect; + readonly wait: ( + scope: McpInvocationScope, + input: SubAgentWaitInput, + ) => Effect.Effect; +} + +export class SubAgentCoordinator extends Context.Service< + SubAgentCoordinator, + SubAgentCoordinatorShape +>()("t3/mcp/toolkits/agents/SubAgentCoordinator") {} + +const isSpawnableProvider = (provider: ServerProvider): boolean => + isProviderAvailable(provider) && + provider.enabled && + provider.installed && + provider.status !== "error" && + provider.status !== "disabled"; + +const defaultTitleForPrompt = (prompt: string): string => { + const firstLine = prompt.split("\n", 1)[0]?.trim() ?? ""; + const seed = firstLine.length > 0 ? firstLine : "Sub-agent task"; + return seed.length > DEFAULT_TITLE_MAX_LENGTH + ? `${seed.slice(0, DEFAULT_TITLE_MAX_LENGTH - 1).trimEnd()}…` + : seed; +}; + +const finalAssistantText = (thread: OrchestrationThread): string | null => { + const latestTurn = thread.latestTurn; + if (latestTurn?.assistantMessageId) { + const byId = thread.messages.find((message) => message.id === latestTurn.assistantMessageId); + if (byId) return byId.text; + } + for (let index = thread.messages.length - 1; index >= 0; index -= 1) { + const message = thread.messages[index]; + if (message?.role === "assistant" && latestTurn && message.turnId === latestTurn.turnId) { + return message.text; + } + } + return null; +}; + +const turnStatus = (thread: OrchestrationThread, sinceIso: string): SubAgentStatus => { + const latestTurn = thread.latestTurn; + // The projection lags the dispatched turn-start command; treat a missing + // or older latest turn as the requested turn still spinning up. + if (!latestTurn || latestTurn.requestedAt < sinceIso) return "running"; + if (latestTurn.state === "running") return "running"; + if ((thread.session?.activeTurnId ?? null) !== null) return "running"; + return latestTurn.state; +}; + +const makeSubAgentCoordinator = Effect.gen(function* () { + const crypto = yield* Crypto.Crypto; + const engine = yield* OrchestrationEngineService; + const snapshotQuery = yield* ProjectionSnapshotQuery; + const providerRegistry = yield* ProviderRegistry; + const children = yield* SynchronizedRef.make>(new Map()); + + const randomUuid = crypto.randomUUIDv4.pipe(Effect.orDie); + const nowIso = Effect.map(DateTime.now, DateTime.formatIso); + + const dispatchFailed = (operation: string) => (cause: unknown) => + new SubAgentError({ + reason: "dispatch-failed", + description: `Failed to ${operation}: ${cause instanceof Error ? cause.message : String(cause)}`, + }); + + const requireChildOfCaller = Effect.fn("SubAgentCoordinator.requireChildOfCaller")(function* ( + scope: McpInvocationScope, + threadId: ThreadId, + ) { + const record = (yield* SynchronizedRef.get(children)).get(threadId); + if (!record || record.parentThreadId !== scope.threadId) { + return yield* new SubAgentError({ + reason: "thread-not-found", + description: `Thread ${threadId} is not a sub-agent spawned by this session. Use agent_spawn first; sub-agent handles do not survive server restarts.`, + }); + } + return record; + }); + + const readThreadDetail = Effect.fn("SubAgentCoordinator.readThreadDetail")(function* ( + threadId: ThreadId, + ) { + const detail = yield* snapshotQuery + .getThreadDetailById(threadId) + .pipe(Effect.mapError(dispatchFailed("read sub-agent thread state"))); + if (Option.isNone(detail)) { + return yield* new SubAgentError({ + reason: "thread-not-found", + description: `Sub-agent thread ${threadId} no longer exists (it may have been deleted or archived).`, + }); + } + return detail.value; + }); + + const startTurn = Effect.fn("SubAgentCoordinator.startTurn")(function* ( + threadId: ThreadId, + prompt: string, + runtimeMode: RuntimeMode, + ) { + const createdAt = yield* nowIso; + const commandUuid = yield* randomUuid; + const messageUuid = yield* randomUuid; + yield* engine + .dispatch({ + type: "thread.turn.start", + commandId: CommandId.make(`server:sub-agent-turn:${commandUuid}`), + threadId, + message: { + messageId: MessageId.make(messageUuid), + role: "user", + text: prompt, + attachments: [], + }, + runtimeMode, + interactionMode: "default", + createdAt, + }) + .pipe(Effect.mapError(dispatchFailed("start sub-agent turn"))); + return createdAt; + }); + + const list: SubAgentCoordinatorShape["list"] = Effect.fn("SubAgentCoordinator.list")( + function* (scope) { + const providers = yield* providerRegistry.getProviders; + return { + providers: providers.map((provider) => ({ + instanceId: provider.instanceId, + driver: provider.driver, + ...(provider.displayName !== undefined ? { displayName: provider.displayName } : {}), + status: provider.status, + authStatus: provider.auth.status, + spawnable: isSpawnableProvider(provider), + models: provider.models.map((model) => model.slug), + isCaller: provider.instanceId === scope.providerInstanceId, + })), + }; + }, + ); + + const spawn: SubAgentCoordinatorShape["spawn"] = Effect.fn("SubAgentCoordinator.spawn")( + function* (scope, input) { + const callerDepth = (yield* SynchronizedRef.get(children)).get(scope.threadId)?.depth ?? 0; + if (callerDepth >= SUB_AGENT_MAX_SPAWN_DEPTH) { + return yield* new SubAgentError({ + reason: "depth-limit-exceeded", + description: `Sub-agents may only nest ${SUB_AGENT_MAX_SPAWN_DEPTH} levels deep; this session is already at depth ${callerDepth}. Do the work in this session instead.`, + }); + } + + const providers = yield* providerRegistry.getProviders; + const target = providers.find((provider) => provider.instanceId === input.providerInstanceId); + if (!target) { + return yield* new SubAgentError({ + reason: "provider-not-found", + description: `No provider instance "${input.providerInstanceId}" is configured. Call agent_list for valid instance ids.`, + }); + } + if (!isSpawnableProvider(target)) { + return yield* new SubAgentError({ + reason: "provider-not-spawnable", + description: `Provider instance "${target.instanceId}" (${target.driver}) is not ready (status: ${target.status}, auth: ${target.auth.status}). Call agent_list to pick a spawnable provider.`, + }); + } + const model = input.model ?? target.models[0]?.slug; + if (model === undefined) { + return yield* new SubAgentError({ + reason: "model-not-resolved", + description: `Provider instance "${target.instanceId}" reports no models; pass an explicit model slug.`, + }); + } + + const callerThread = yield* snapshotQuery + .getThreadShellById(scope.threadId) + .pipe(Effect.mapError(dispatchFailed("read calling thread"))); + if (Option.isNone(callerThread)) { + return yield* new SubAgentError({ + reason: "caller-thread-not-found", + description: + "The calling session's thread no longer exists; cannot place a sub-agent next to it.", + }); + } + const parent = callerThread.value; + + const createdAt = yield* nowIso; + const commandUuid = yield* randomUuid; + const threadUuid = yield* randomUuid; + const childThreadId = ThreadId.make(threadUuid); + const title = input.title ?? defaultTitleForPrompt(input.prompt); + + yield* engine + .dispatch({ + type: "thread.create", + commandId: CommandId.make(`server:sub-agent-spawn:${commandUuid}`), + threadId: childThreadId, + projectId: parent.projectId, + title, + modelSelection: { instanceId: target.instanceId, model }, + runtimeMode: parent.runtimeMode, + interactionMode: "default", + branch: parent.branch, + worktreePath: parent.worktreePath, + createdAt, + }) + .pipe(Effect.mapError(dispatchFailed("create sub-agent thread"))); + + const lastTurnRequestedAt = yield* startTurn(childThreadId, input.prompt, parent.runtimeMode); + + yield* SynchronizedRef.update(children, (current) => { + const next = new Map(current); + next.set(childThreadId, { + parentThreadId: scope.threadId, + depth: callerDepth + 1, + lastTurnRequestedAt, + }); + return next; + }); + + return { + threadId: childThreadId, + providerInstanceId: target.instanceId, + model, + title, + status: "running" as const, + }; + }, + ); + + const send: SubAgentCoordinatorShape["send"] = Effect.fn("SubAgentCoordinator.send")( + function* (scope, input) { + const record = yield* requireChildOfCaller(scope, input.threadId); + const thread = yield* readThreadDetail(input.threadId); + const lastTurnRequestedAt = yield* startTurn( + input.threadId, + input.prompt, + thread.runtimeMode, + ); + yield* SynchronizedRef.update(children, (current) => { + const next = new Map(current); + next.set(input.threadId, { ...record, lastTurnRequestedAt }); + return next; + }); + return { threadId: input.threadId, status: "running" as const }; + }, + ); + + const wait: SubAgentCoordinatorShape["wait"] = Effect.fn("SubAgentCoordinator.wait")( + function* (scope, input) { + const record = yield* requireChildOfCaller(scope, input.threadId); + const timeoutSeconds = input.timeoutSeconds ?? DEFAULT_WAIT_TIMEOUT_SECONDS; + const deadline = (yield* Clock.currentTimeMillis) + timeoutSeconds * 1_000; + + while (true) { + const thread = yield* readThreadDetail(input.threadId); + const status = turnStatus(thread, record.lastTurnRequestedAt); + if (status !== "running") { + return { + threadId: input.threadId, + status, + finalText: finalAssistantText(thread), + }; + } + if ((yield* Clock.currentTimeMillis) >= deadline) { + return { threadId: input.threadId, status: "running" as const, finalText: null }; + } + yield* Effect.sleep(Duration.millis(WAIT_POLL_INTERVAL_MILLIS)); + } + }, + ); + + return SubAgentCoordinator.of({ list, spawn, send, wait }); +}); + +export const SubAgentCoordinatorLive = Layer.effect(SubAgentCoordinator, makeSubAgentCoordinator); + +/** Exposed for tests. */ +export const __testing = { + make: makeSubAgentCoordinator, +}; diff --git a/apps/server/src/mcp/toolkits/agents/handlers.ts b/apps/server/src/mcp/toolkits/agents/handlers.ts new file mode 100644 index 00000000000..3bd425ded13 --- /dev/null +++ b/apps/server/src/mcp/toolkits/agents/handlers.ts @@ -0,0 +1,49 @@ +import { SubAgentError } from "@t3tools/contracts"; +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; + +import * as McpInvocationContext from "../../McpInvocationContext.ts"; +import { SubAgentCoordinator, SubAgentCoordinatorLive } from "./SubAgentCoordinator.ts"; +import { SubAgentToolkit } from "./tools.ts"; + +const requireAgentsScope = Effect.fn("SubAgentToolkit.requireAgentsScope")(function* () { + const invocation = yield* McpInvocationContext.McpInvocationContext; + if (!invocation.capabilities.has("agents")) { + return yield* new SubAgentError({ + reason: "capability-unavailable", + description: "This MCP credential does not grant the agents capability.", + }); + } + return invocation; +}); + +const SubAgentToolkitHandlers = SubAgentToolkit.toLayer({ + agent_list: () => + Effect.gen(function* () { + const scope = yield* requireAgentsScope(); + const coordinator = yield* SubAgentCoordinator; + return yield* coordinator.list(scope); + }), + agent_spawn: (input) => + Effect.gen(function* () { + const scope = yield* requireAgentsScope(); + const coordinator = yield* SubAgentCoordinator; + return yield* coordinator.spawn(scope, input); + }), + agent_send: (input) => + Effect.gen(function* () { + const scope = yield* requireAgentsScope(); + const coordinator = yield* SubAgentCoordinator; + return yield* coordinator.send(scope, input); + }), + agent_wait: (input) => + Effect.gen(function* () { + const scope = yield* requireAgentsScope(); + const coordinator = yield* SubAgentCoordinator; + return yield* coordinator.wait(scope, input); + }), +}); + +export const SubAgentToolkitHandlersLive = SubAgentToolkitHandlers.pipe( + Layer.provide(SubAgentCoordinatorLive), +); diff --git a/apps/server/src/mcp/toolkits/agents/tools.ts b/apps/server/src/mcp/toolkits/agents/tools.ts new file mode 100644 index 00000000000..55248978ab3 --- /dev/null +++ b/apps/server/src/mcp/toolkits/agents/tools.ts @@ -0,0 +1,73 @@ +import { + SubAgentError, + SubAgentListResult, + SubAgentSendInput, + SubAgentSendResult, + SubAgentSpawnInput, + SubAgentSpawnResult, + SubAgentWaitInput, + SubAgentWaitResult, +} from "@t3tools/contracts"; +import * as Schema from "effect/Schema"; +import { Tool, Toolkit } from "effect/unstable/ai"; + +import * as McpInvocationContext from "../../McpInvocationContext.ts"; +import { SubAgentCoordinator } from "./SubAgentCoordinator.ts"; + +const dependencies = [McpInvocationContext.McpInvocationContext, SubAgentCoordinator]; + +export const AgentListTool = Tool.make("agent_list", { + description: + "List the coding-agent providers configured on this server (e.g. Codex, Claude, Cursor) that can run sub-agent threads via agent_spawn. Shows each instance id, driver, readiness, available model slugs, and which instance is running the current session.", + parameters: Schema.Struct({}), + success: SubAgentListResult, + failure: SubAgentError, + dependencies, +}) + .annotate(Tool.Title, "List sub-agent providers") + .annotate(Tool.Readonly, true) + .annotate(Tool.Destructive, false) + .annotate(Tool.Idempotent, true); + +export const AgentSpawnTool = Tool.make("agent_spawn", { + description: + "Spawn a sub-agent: start a new thread on any configured provider instance (including a different agent than yourself — e.g. delegate from Claude to Codex or vice versa) and send it an initial prompt. The sub-agent works in this thread's project and worktree. Returns immediately with the new threadId; use agent_wait to collect the result and agent_send for follow-up prompts. Use agent_list first to pick a spawnable providerInstanceId.", + parameters: SubAgentSpawnInput, + success: SubAgentSpawnResult, + failure: SubAgentError, + dependencies, +}) + .annotate(Tool.Title, "Spawn sub-agent") + .annotate(Tool.OpenWorld, true) + .annotate(Tool.Destructive, true); + +export const AgentSendTool = Tool.make("agent_send", { + description: + "Send a follow-up prompt to a sub-agent thread previously created with agent_spawn. Returns immediately; use agent_wait to collect the response.", + parameters: SubAgentSendInput, + success: SubAgentSendResult, + failure: SubAgentError, + dependencies, +}) + .annotate(Tool.Title, "Prompt sub-agent") + .annotate(Tool.OpenWorld, true) + .annotate(Tool.Destructive, true); + +export const AgentWaitTool = Tool.make("agent_wait", { + description: + 'Wait for a sub-agent thread (created with agent_spawn) to finish its current turn and return the final assistant message. Blocks up to timeoutSeconds (default 60, max 600); a status of "running" means the sub-agent is still working — call agent_wait again to keep waiting.', + parameters: SubAgentWaitInput, + success: SubAgentWaitResult, + failure: SubAgentError, + dependencies, +}) + .annotate(Tool.Title, "Await sub-agent result") + .annotate(Tool.Readonly, true) + .annotate(Tool.Destructive, false); + +export const SubAgentToolkit = Toolkit.make( + AgentListTool, + AgentSpawnTool, + AgentSendTool, + AgentWaitTool, +); diff --git a/packages/contracts/src/index.ts b/packages/contracts/src/index.ts index 43270efdec7..dda17eb5afd 100644 --- a/packages/contracts/src/index.ts +++ b/packages/contracts/src/index.ts @@ -25,4 +25,5 @@ export * from "./assets.ts"; export * from "./review.ts"; export * from "./preview.ts"; export * from "./previewAutomation.ts"; +export * from "./subAgents.ts"; export * from "./rpc.ts"; diff --git a/packages/contracts/src/previewAutomation.ts b/packages/contracts/src/previewAutomation.ts index 6431dd0dcfd..25a37d425b8 100644 --- a/packages/contracts/src/previewAutomation.ts +++ b/packages/contracts/src/previewAutomation.ts @@ -608,7 +608,7 @@ export type PreviewAutomationResponse = typeof PreviewAutomationResponse.Type; export class PreviewAutomationUnavailableError extends Schema.TaggedErrorClass()( "PreviewAutomationUnavailableError", { - capability: Schema.Literal("preview"), + capability: Schema.Literals(["preview", "agents"]), environmentId: EnvironmentId, threadId: ThreadId, providerSessionId: TrimmedNonEmptyString, diff --git a/packages/contracts/src/subAgents.ts b/packages/contracts/src/subAgents.ts new file mode 100644 index 00000000000..8b72a77e72f --- /dev/null +++ b/packages/contracts/src/subAgents.ts @@ -0,0 +1,135 @@ +/** + * Sub-agent orchestration contracts. + * + * Schemas for the product-native MCP "agents" toolkit that lets a running + * provider session spawn and drive sibling threads on any configured + * provider instance — including a different driver than its own (e.g. a + * Claude session delegating a task to a Codex instance, or vice versa). + * + * Keep this module schema-only; the runtime lives in + * `apps/server/src/mcp/toolkits/agents/`. + */ +import * as Schema from "effect/Schema"; + +import { ThreadId, TrimmedNonEmptyString } from "./baseSchemas.ts"; +import { ProviderDriverKind, ProviderInstanceId } from "./providerInstance.ts"; +import { ServerProviderAuthStatus, ServerProviderState } from "./server.ts"; + +/** + * Maximum spawn nesting. A user-created thread sits at depth 0; each + * sub-agent thread is one level deeper. Spawning is refused once the + * caller is already at this depth, bounding recursive fan-out. + */ +export const SUB_AGENT_MAX_SPAWN_DEPTH = 2; + +export const SubAgentStatus = Schema.Literals(["running", "completed", "interrupted", "error"]); +export type SubAgentStatus = typeof SubAgentStatus.Type; + +export const SubAgentProviderSummary = Schema.Struct({ + instanceId: ProviderInstanceId, + driver: ProviderDriverKind, + displayName: Schema.optional(TrimmedNonEmptyString), + status: ServerProviderState, + authStatus: ServerProviderAuthStatus, + /** Whether `agent_spawn` currently accepts this instance as a target. */ + spawnable: Schema.Boolean, + /** Model slugs accepted by `agent_spawn`'s `model` input. */ + models: Schema.Array(TrimmedNonEmptyString), + /** True for the instance running the calling agent session. */ + isCaller: Schema.Boolean, +}); +export type SubAgentProviderSummary = typeof SubAgentProviderSummary.Type; + +export const SubAgentListResult = Schema.Struct({ + providers: Schema.Array(SubAgentProviderSummary), +}); +export type SubAgentListResult = typeof SubAgentListResult.Type; + +export const SubAgentSpawnInput = Schema.Struct({ + providerInstanceId: ProviderInstanceId.annotate({ + description: + "Target provider instance id from agent_list (may be a different agent than the caller, e.g. spawn Codex from Claude).", + }), + prompt: TrimmedNonEmptyString.annotate({ + description: + "Initial task prompt for the sub-agent. Include all context it needs; it does not see this conversation.", + }), + model: Schema.optional( + TrimmedNonEmptyString.annotate({ + description: "Model slug from agent_list. Defaults to the target provider's first model.", + }), + ), + title: Schema.optional( + TrimmedNonEmptyString.annotate({ + description: "Thread title shown in the UI. Defaults to the first line of the prompt.", + }), + ), +}); +export type SubAgentSpawnInput = typeof SubAgentSpawnInput.Type; + +export const SubAgentSpawnResult = Schema.Struct({ + threadId: ThreadId, + providerInstanceId: ProviderInstanceId, + model: TrimmedNonEmptyString, + title: TrimmedNonEmptyString, + status: SubAgentStatus, +}); +export type SubAgentSpawnResult = typeof SubAgentSpawnResult.Type; + +export const SubAgentSendInput = Schema.Struct({ + threadId: ThreadId.annotate({ + description: "Sub-agent thread id returned by agent_spawn in this session.", + }), + prompt: TrimmedNonEmptyString.annotate({ + description: "Follow-up prompt for the sub-agent's next turn.", + }), +}); +export type SubAgentSendInput = typeof SubAgentSendInput.Type; + +export const SubAgentSendResult = Schema.Struct({ + threadId: ThreadId, + status: SubAgentStatus, +}); +export type SubAgentSendResult = typeof SubAgentSendResult.Type; + +export const SubAgentWaitInput = Schema.Struct({ + threadId: ThreadId.annotate({ + description: "Sub-agent thread id returned by agent_spawn in this session.", + }), + timeoutSeconds: Schema.optional( + Schema.Int.check(Schema.isBetween({ minimum: 1, maximum: 600 })).annotate({ + description: + 'Maximum seconds to block for the turn to finish (default 60). On timeout the result status is "running"; call agent_wait again to keep waiting.', + }), + ), +}); +export type SubAgentWaitInput = typeof SubAgentWaitInput.Type; + +export const SubAgentWaitResult = Schema.Struct({ + threadId: ThreadId, + status: SubAgentStatus, + /** Final assistant message of the awaited turn; null while running or when unavailable. */ + finalText: Schema.NullOr(Schema.String), +}); +export type SubAgentWaitResult = typeof SubAgentWaitResult.Type; + +export const SubAgentErrorReason = Schema.Literals([ + "capability-unavailable", + "provider-not-found", + "provider-not-spawnable", + "model-not-resolved", + "caller-thread-not-found", + "thread-not-found", + "depth-limit-exceeded", + "dispatch-failed", +]); +export type SubAgentErrorReason = typeof SubAgentErrorReason.Type; + +export class SubAgentError extends Schema.TaggedErrorClass()("SubAgentError", { + reason: SubAgentErrorReason, + description: Schema.String, +}) { + override get message(): string { + return this.description; + } +}