diff --git a/.github/workflows/codex-review-gate.yml b/.github/workflows/codex-review-gate.yml new file mode 100644 index 00000000000..cf34b307cdc --- /dev/null +++ b/.github/workflows/codex-review-gate.yml @@ -0,0 +1,53 @@ +name: Codex review gate + +on: + pull_request: + types: [opened, synchronize, reopened, ready_for_review] + pull_request_review: + types: [submitted] + +permissions: + pull-requests: read + +jobs: + codex-review-required: + name: codex-review-required + runs-on: ubuntu-latest + timeout-minutes: 20 + steps: + - name: Require one Codex review on this PR + env: + GH_TOKEN: ${{ github.token }} + REPO: ${{ github.repository }} + PR: ${{ github.event.pull_request.number }} + run: | + codex_reviewed() { + local n + n=$(gh api "repos/$REPO/pulls/$PR/reviews" --paginate | jq -s ' + [add[] | select( + (.user.type == "Bot") + and (.user.login | ascii_downcase | contains("codex")) + and (.state != "DISMISSED") + )] | length') + [ "$n" -ge 1 ] && return 0 + # A clean Codex review ("no major issues") arrives as an issue + # comment carrying "Reviewed commit: ``", not as a PR review. + n=$(gh api "repos/$REPO/issues/$PR/comments" --paginate | jq -s ' + [add[] | select( + (.user.type == "Bot") + and (.user.login | ascii_downcase | contains("codex")) + and (.body | test("Reviewed commit:")) + )] | length') + [ "$n" -ge 1 ] + } + + for i in $(seq 1 30); do + if codex_reviewed; then + echo "Codex has reviewed this PR." + exit 0 + fi + echo "Waiting for Codex review ($i/30)..." + sleep 30 + done + echo "::error::No Codex review on this PR after 15 minutes. Comment '@codex review' on the PR, then re-run this check." + exit 1 diff --git a/apps/server/src/mcp/McpHttpServer.ts b/apps/server/src/mcp/McpHttpServer.ts index 6774731a73e..d3172ffc98c 100644 --- a/apps/server/src/mcp/McpHttpServer.ts +++ b/apps/server/src/mcp/McpHttpServer.ts @@ -22,6 +22,8 @@ import { PreviewSnapshotToolkit, PreviewStandardToolkit, } from "./toolkits/preview/tools.ts"; +import { SubAgentToolkitHandlersLive } from "./toolkits/agents/handlers.ts"; +import { SubAgentToolkit } from "./toolkits/agents/tools.ts"; const unauthorized = HttpServerResponse.jsonUnsafe( { @@ -208,10 +210,17 @@ export const PreviewToolkitRegistrationLive = Layer.mergeAll( PreviewSnapshotRegistrationLive, ); +export const SubAgentToolkitRegistrationLive = McpServer.toolkit(SubAgentToolkit).pipe( + Layer.provide(SubAgentToolkitHandlersLive), +); + const McpTransportLive = McpServer.layerHttp({ name: "T3 Code", version: packageJson.version, path: "/mcp", }).pipe(Layer.provide(McpAuthMiddlewareLive)); -export const layer = PreviewToolkitRegistrationLive.pipe(Layer.provideMerge(McpTransportLive)); +export const layer = Layer.mergeAll( + PreviewToolkitRegistrationLive, + SubAgentToolkitRegistrationLive, +).pipe(Layer.provideMerge(McpTransportLive)); diff --git a/apps/server/src/mcp/McpInvocationContext.ts b/apps/server/src/mcp/McpInvocationContext.ts index b13bf2d312e..20f5191d7a6 100644 --- a/apps/server/src/mcp/McpInvocationContext.ts +++ b/apps/server/src/mcp/McpInvocationContext.ts @@ -7,7 +7,7 @@ import { import * as Context from "effect/Context"; import * as Effect from "effect/Effect"; -export type McpCapability = "preview"; +export type McpCapability = "preview" | "agents"; export interface McpInvocationScope { readonly environmentId: EnvironmentId; diff --git a/apps/server/src/mcp/McpSessionRegistry.ts b/apps/server/src/mcp/McpSessionRegistry.ts index 67c4f2f0ff0..37404708436 100644 --- a/apps/server/src/mcp/McpSessionRegistry.ts +++ b/apps/server/src/mcp/McpSessionRegistry.ts @@ -114,7 +114,7 @@ const makeWithOptions = Effect.fn("McpSessionRegistry.make")(function* ( threadId: ThreadId.make(request.threadId), providerSessionId, providerInstanceId: ProviderInstanceId.make(request.providerInstanceId), - capabilities: new Set(["preview"]), + capabilities: new Set(["preview", "agents"]), issuedAt, expiresAt, }; diff --git a/apps/server/src/mcp/toolkits/agents/SubAgentCoordinator.test.ts b/apps/server/src/mcp/toolkits/agents/SubAgentCoordinator.test.ts new file mode 100644 index 00000000000..b3b99d8f238 --- /dev/null +++ b/apps/server/src/mcp/toolkits/agents/SubAgentCoordinator.test.ts @@ -0,0 +1,375 @@ +import * as NodeServices from "@effect/platform-node/NodeServices"; +import { expect, it } from "@effect/vitest"; +import { + EnvironmentId, + MessageId, + ProviderDriverKind, + ProviderInstanceId, + ThreadId, + TurnId, + type OrchestrationCommand, + type OrchestrationThread, + type OrchestrationThreadShell, + type ServerProvider, + type SubAgentError, +} from "@t3tools/contracts"; +import { createModelCapabilities } from "@t3tools/shared/model"; +import * as Duration from "effect/Duration"; +import * as Effect from "effect/Effect"; +import * as Fiber from "effect/Fiber"; +import * as Option from "effect/Option"; +import * as Stream from "effect/Stream"; +import * as TestClock from "effect/testing/TestClock"; + +import { OrchestrationEngineService } from "../../../orchestration/Services/OrchestrationEngine.ts"; +import { ProjectionSnapshotQuery } from "../../../orchestration/Services/ProjectionSnapshotQuery.ts"; +import { ProviderRegistry } from "../../../provider/Services/ProviderRegistry.ts"; +import type { McpInvocationScope } from "../../McpInvocationContext.ts"; +import { SubAgentCoordinator, __testing } from "./SubAgentCoordinator.ts"; + +const emptyCapabilities = createModelCapabilities({ optionDescriptors: [] }); + +const makeProvider = ( + instanceId: string, + driver: string, + overrides?: Partial, +): ServerProvider => ({ + instanceId: ProviderInstanceId.make(instanceId), + driver: ProviderDriverKind.make(driver), + enabled: true, + installed: true, + version: "1.0.0", + status: "ready", + auth: { status: "authenticated" }, + checkedAt: "2026-04-11T00:00:00.000Z", + models: [ + { + slug: "default-model", + name: "Default Model", + isCustom: false, + capabilities: emptyCapabilities, + }, + ], + slashCommands: [], + skills: [], + ...overrides, +}); + +const parentThreadId = ThreadId.make("parent-thread"); +const projectId = "project-1"; + +const makeScope = (threadId: ThreadId = parentThreadId): McpInvocationScope => ({ + environmentId: EnvironmentId.make("environment-1"), + threadId, + providerSessionId: "provider-session-1", + providerInstanceId: ProviderInstanceId.make("claude"), + capabilities: new Set(["preview", "agents"]), + issuedAt: 0, + expiresAt: Number.MAX_SAFE_INTEGER, +}); + +const makeThreadShell = (threadId: ThreadId): OrchestrationThreadShell => + ({ + id: threadId, + projectId, + title: "Parent thread", + modelSelection: { instanceId: ProviderInstanceId.make("claude"), model: "opus" }, + runtimeMode: "approval-required", + interactionMode: "default", + branch: "feature/foo", + worktreePath: "/tmp/worktrees/foo", + latestTurn: null, + createdAt: "2026-04-11T00:00:00.000Z", + updatedAt: "2026-04-11T00:00:00.000Z", + archivedAt: null, + session: null, + latestUserMessageAt: null, + hasPendingApprovals: false, + hasPendingUserInput: false, + hasActionableProposedPlan: false, + }) as OrchestrationThreadShell; + +const makeThreadDetail = ( + threadId: ThreadId, + overrides?: Partial, +): OrchestrationThread => + ({ + id: threadId, + projectId, + title: "Sub-agent thread", + modelSelection: { instanceId: ProviderInstanceId.make("codex"), model: "default-model" }, + runtimeMode: "approval-required", + interactionMode: "default", + branch: null, + worktreePath: null, + latestTurn: null, + createdAt: "2026-04-11T00:00:00.000Z", + updatedAt: "2026-04-11T00:00:00.000Z", + archivedAt: null, + deletedAt: null, + messages: [], + proposedPlans: [], + activities: [], + checkpoints: [], + session: null, + ...overrides, + }) as OrchestrationThread; + +interface Harness { + readonly dispatched: Array; + readonly setThreadDetail: ( + lookup: (threadId: ThreadId) => Option.Option, + ) => void; +} + +const makeCoordinator = (options?: { + readonly providers?: ReadonlyArray; +}): Effect.Effect => { + const dispatched: Array = []; + let threadDetailLookup: (threadId: ThreadId) => Option.Option = () => + Option.none(); + + const engine = OrchestrationEngineService.of({ + readEvents: () => Stream.empty, + dispatch: (command) => + Effect.sync(() => { + dispatched.push(command); + return { sequence: dispatched.length }; + }), + streamDomainEvents: Stream.never, + }); + + const unused = () => Effect.die("unused in SubAgentCoordinator tests"); + const snapshotQuery = ProjectionSnapshotQuery.of({ + getCommandReadModel: unused, + getSnapshot: unused, + getShellSnapshot: unused, + getArchivedShellSnapshot: unused, + getSnapshotSequence: unused, + getCounts: unused, + getActiveProjectByWorkspaceRoot: unused, + getProjectShellById: unused, + getFirstActiveThreadIdByProjectId: unused, + getThreadCheckpointContext: unused, + getFullThreadDiffContext: unused, + getThreadShellById: (threadId) => Effect.succeed(Option.some(makeThreadShell(threadId))), + getThreadDetailById: (threadId) => Effect.sync(() => threadDetailLookup(threadId)), + }); + + const providers = options?.providers ?? [ + makeProvider("claude", "claudeAgent"), + makeProvider("codex", "codex"), + ]; + const providerRegistry = ProviderRegistry.of({ + getProviders: Effect.succeed(providers), + refresh: unused, + refreshInstance: unused, + getProviderMaintenanceCapabilitiesForInstance: unused, + setProviderMaintenanceActionState: unused, + streamChanges: Stream.never, + }); + + const harness: Harness = { + dispatched, + setThreadDetail: (lookup) => { + threadDetailLookup = lookup; + }, + }; + + return __testing.make.pipe( + Effect.provideService(OrchestrationEngineService, engine), + Effect.provideService(ProjectionSnapshotQuery, snapshotQuery), + Effect.provideService(ProviderRegistry, providerRegistry), + Effect.provide(NodeServices.layer), + Effect.map((coordinator) => [coordinator, harness] as const), + ); +}; + +const expectSubAgentError = (effect: Effect.Effect) => Effect.flip(effect); + +it.effect("spawns a sub-agent thread next to the caller's thread on another provider", () => + Effect.gen(function* () { + const [coordinator, harness] = yield* makeCoordinator(); + + const result = yield* coordinator.spawn(makeScope(), { + providerInstanceId: ProviderInstanceId.make("codex"), + prompt: "Review the auth module for bugs.", + }); + + expect(result.status).toBe("running"); + expect(result.providerInstanceId).toBe("codex"); + expect(result.model).toBe("default-model"); + expect(result.title).toBe("Review the auth module for bugs."); + + expect(harness.dispatched).toHaveLength(2); + const [create, turnStart] = harness.dispatched; + expect(create?.type).toBe("thread.create"); + if (create?.type === "thread.create") { + expect(create.threadId).toBe(result.threadId); + expect(create.projectId).toBe(projectId); + expect(create.worktreePath).toBe("/tmp/worktrees/foo"); + expect(create.branch).toBe("feature/foo"); + expect(create.runtimeMode).toBe("approval-required"); + expect(create.modelSelection).toEqual({ instanceId: "codex", model: "default-model" }); + } + expect(turnStart?.type).toBe("thread.turn.start"); + if (turnStart?.type === "thread.turn.start") { + expect(turnStart.threadId).toBe(result.threadId); + expect(turnStart.message.text).toBe("Review the auth module for bugs."); + expect(turnStart.runtimeMode).toBe("approval-required"); + } + }), +); + +it.effect("rejects spawn targets that are unknown or not ready", () => + Effect.gen(function* () { + const [coordinator] = yield* makeCoordinator({ + providers: [ + makeProvider("claude", "claudeAgent"), + makeProvider("codex", "codex", { status: "error", auth: { status: "unauthenticated" } }), + ], + }); + + const unknown = yield* expectSubAgentError( + coordinator.spawn(makeScope(), { + providerInstanceId: ProviderInstanceId.make("missing"), + prompt: "Do something.", + }), + ); + expect(unknown.reason).toBe("provider-not-found"); + + const notReady = yield* expectSubAgentError( + coordinator.spawn(makeScope(), { + providerInstanceId: ProviderInstanceId.make("codex"), + prompt: "Do something.", + }), + ); + expect(notReady.reason).toBe("provider-not-spawnable"); + }), +); + +it.effect("bounds recursive sub-agent nesting", () => + Effect.gen(function* () { + const [coordinator] = yield* makeCoordinator(); + + const first = yield* coordinator.spawn(makeScope(), { + providerInstanceId: ProviderInstanceId.make("codex"), + prompt: "Level one task.", + }); + + const childScope = makeScope(first.threadId); + const second = yield* coordinator + .spawn(childScope, { + providerInstanceId: ProviderInstanceId.make("claude"), + prompt: "Level two task.", + }) + .pipe( + Effect.catch((error) => + Effect.sync(() => { + // Depth 1 caller must still be allowed; only depth 2 is refused. + throw new Error(`unexpected refusal at depth 1: ${error.description}`); + }), + ), + ); + + const grandchildScope = makeScope(second.threadId); + const refused = yield* expectSubAgentError( + coordinator.spawn(grandchildScope, { + providerInstanceId: ProviderInstanceId.make("codex"), + prompt: "Level three task.", + }), + ); + expect(refused.reason).toBe("depth-limit-exceeded"); + }), +); + +it.effect("refuses to drive threads the calling session did not spawn", () => + Effect.gen(function* () { + const [coordinator] = yield* makeCoordinator(); + + const send = yield* expectSubAgentError( + coordinator.send(makeScope(), { threadId: ThreadId.make("foreign-thread"), prompt: "hi" }), + ); + expect(send.reason).toBe("thread-not-found"); + + const wait = yield* expectSubAgentError( + coordinator.wait(makeScope(), { threadId: ThreadId.make("foreign-thread") }), + ); + expect(wait.reason).toBe("thread-not-found"); + }), +); + +it.effect("waits for the spawned turn to complete and returns the assistant text", () => + Effect.gen(function* () { + const [coordinator, harness] = yield* makeCoordinator(); + + const spawned = yield* coordinator.spawn(makeScope(), { + providerInstanceId: ProviderInstanceId.make("codex"), + prompt: "Summarize the repo.", + }); + + const assistantMessageId = MessageId.make("assistant-message-1"); + const turnId = TurnId.make("turn-1"); + harness.setThreadDetail((threadId) => + threadId === spawned.threadId + ? Option.some( + makeThreadDetail(spawned.threadId, { + latestTurn: { + turnId, + state: "completed", + requestedAt: "9999-01-01T00:00:00.000Z", + startedAt: "9999-01-01T00:00:00.000Z", + completedAt: "9999-01-01T00:00:01.000Z", + assistantMessageId, + }, + messages: [ + { + id: assistantMessageId, + role: "assistant", + text: "The repo is a coding-agent GUI.", + turnId, + streaming: false, + createdAt: "9999-01-01T00:00:01.000Z", + updatedAt: "9999-01-01T00:00:01.000Z", + }, + ], + }), + ) + : Option.none(), + ); + + const result = yield* coordinator.wait(makeScope(), { + threadId: spawned.threadId, + timeoutSeconds: 5, + }); + expect(result.status).toBe("completed"); + expect(result.finalText).toBe("The repo is a coding-agent GUI."); + }), +); + +it.effect("reports running when the sub-agent has not finished before the timeout", () => + Effect.gen(function* () { + const [coordinator, harness] = yield* makeCoordinator(); + + const spawned = yield* coordinator.spawn(makeScope(), { + providerInstanceId: ProviderInstanceId.make("codex"), + prompt: "Long running task.", + }); + harness.setThreadDetail((threadId) => + threadId === spawned.threadId + ? Option.some(makeThreadDetail(spawned.threadId, { latestTurn: null })) + : Option.none(), + ); + + const waiting = yield* coordinator + .wait(makeScope(), { + threadId: spawned.threadId, + timeoutSeconds: 1, + }) + .pipe(Effect.forkChild); + yield* TestClock.adjust(Duration.seconds(2)); + const result = yield* Fiber.join(waiting); + expect(result.status).toBe("running"); + expect(result.finalText).toBeNull(); + }), +); diff --git a/apps/server/src/mcp/toolkits/agents/SubAgentCoordinator.ts b/apps/server/src/mcp/toolkits/agents/SubAgentCoordinator.ts new file mode 100644 index 00000000000..c9cb1afc743 --- /dev/null +++ b/apps/server/src/mcp/toolkits/agents/SubAgentCoordinator.ts @@ -0,0 +1,348 @@ +/** + * SubAgentCoordinator - Cross-provider sub-agent orchestration for MCP tools. + * + * Backs the `agent_*` MCP toolkit: a running provider session (Claude, + * Codex, Cursor, ...) can spawn a sibling thread on any configured provider + * instance, send follow-up prompts, and await turn completion. Spawned + * threads flow through the regular orchestration engine, so they persist + * and render in the UI like user-created threads. + * + * Parent/child bookkeeping is in-memory only: after a server restart, + * previously spawned threads survive as ordinary threads but can no longer + * be driven through `agent_send`/`agent_wait`. + */ +import { + CommandId, + isProviderAvailable, + MessageId, + SUB_AGENT_MAX_SPAWN_DEPTH, + SubAgentError, + ThreadId, + type OrchestrationThread, + type RuntimeMode, + type ServerProvider, + type SubAgentListResult, + type SubAgentSendInput, + type SubAgentSendResult, + type SubAgentSpawnInput, + type SubAgentSpawnResult, + type SubAgentStatus, + type SubAgentWaitInput, + type SubAgentWaitResult, +} from "@t3tools/contracts"; +import * as Clock from "effect/Clock"; +import * as Context from "effect/Context"; +import * as Crypto from "effect/Crypto"; +import * as DateTime from "effect/DateTime"; +import * as Duration from "effect/Duration"; +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; +import * as Option from "effect/Option"; +import * as SynchronizedRef from "effect/SynchronizedRef"; + +import { OrchestrationEngineService } from "../../../orchestration/Services/OrchestrationEngine.ts"; +import { ProjectionSnapshotQuery } from "../../../orchestration/Services/ProjectionSnapshotQuery.ts"; +import { ProviderRegistry } from "../../../provider/Services/ProviderRegistry.ts"; +import type { McpInvocationScope } from "../../McpInvocationContext.ts"; + +const WAIT_POLL_INTERVAL_MILLIS = 500; +const DEFAULT_WAIT_TIMEOUT_SECONDS = 60; +const DEFAULT_TITLE_MAX_LENGTH = 60; + +interface SubAgentRecord { + readonly parentThreadId: ThreadId; + readonly depth: number; + /** `createdAt` of the most recent turn-start command sent to this child. */ + readonly lastTurnRequestedAt: string; +} + +export interface SubAgentCoordinatorShape { + readonly list: (scope: McpInvocationScope) => Effect.Effect; + readonly spawn: ( + scope: McpInvocationScope, + input: SubAgentSpawnInput, + ) => Effect.Effect; + readonly send: ( + scope: McpInvocationScope, + input: SubAgentSendInput, + ) => Effect.Effect; + readonly wait: ( + scope: McpInvocationScope, + input: SubAgentWaitInput, + ) => Effect.Effect; +} + +export class SubAgentCoordinator extends Context.Service< + SubAgentCoordinator, + SubAgentCoordinatorShape +>()("t3/mcp/toolkits/agents/SubAgentCoordinator") {} + +const isSpawnableProvider = (provider: ServerProvider): boolean => + isProviderAvailable(provider) && + provider.enabled && + provider.installed && + provider.status !== "error" && + provider.status !== "disabled"; + +const defaultTitleForPrompt = (prompt: string): string => { + const firstLine = prompt.split("\n", 1)[0]?.trim() ?? ""; + const seed = firstLine.length > 0 ? firstLine : "Sub-agent task"; + return seed.length > DEFAULT_TITLE_MAX_LENGTH + ? `${seed.slice(0, DEFAULT_TITLE_MAX_LENGTH - 1).trimEnd()}…` + : seed; +}; + +const finalAssistantText = (thread: OrchestrationThread): string | null => { + const latestTurn = thread.latestTurn; + if (latestTurn?.assistantMessageId) { + const byId = thread.messages.find((message) => message.id === latestTurn.assistantMessageId); + if (byId) return byId.text; + } + for (let index = thread.messages.length - 1; index >= 0; index -= 1) { + const message = thread.messages[index]; + if (message?.role === "assistant" && latestTurn && message.turnId === latestTurn.turnId) { + return message.text; + } + } + return null; +}; + +const turnStatus = (thread: OrchestrationThread, sinceIso: string): SubAgentStatus => { + const latestTurn = thread.latestTurn; + // The projection lags the dispatched turn-start command; treat a missing + // or older latest turn as the requested turn still spinning up. + if (!latestTurn || latestTurn.requestedAt < sinceIso) return "running"; + if (latestTurn.state === "running") return "running"; + if ((thread.session?.activeTurnId ?? null) !== null) return "running"; + return latestTurn.state; +}; + +const makeSubAgentCoordinator = Effect.gen(function* () { + const crypto = yield* Crypto.Crypto; + const engine = yield* OrchestrationEngineService; + const snapshotQuery = yield* ProjectionSnapshotQuery; + const providerRegistry = yield* ProviderRegistry; + const children = yield* SynchronizedRef.make>(new Map()); + + const randomUuid = crypto.randomUUIDv4.pipe(Effect.orDie); + const nowIso = Effect.map(DateTime.now, DateTime.formatIso); + + const dispatchFailed = (operation: string) => (cause: unknown) => + new SubAgentError({ + reason: "dispatch-failed", + description: `Failed to ${operation}: ${cause instanceof Error ? cause.message : String(cause)}`, + }); + + const requireChildOfCaller = Effect.fn("SubAgentCoordinator.requireChildOfCaller")(function* ( + scope: McpInvocationScope, + threadId: ThreadId, + ) { + const record = (yield* SynchronizedRef.get(children)).get(threadId); + if (!record || record.parentThreadId !== scope.threadId) { + return yield* new SubAgentError({ + reason: "thread-not-found", + description: `Thread ${threadId} is not a sub-agent spawned by this session. Use agent_spawn first; sub-agent handles do not survive server restarts.`, + }); + } + return record; + }); + + const readThreadDetail = Effect.fn("SubAgentCoordinator.readThreadDetail")(function* ( + threadId: ThreadId, + ) { + const detail = yield* snapshotQuery + .getThreadDetailById(threadId) + .pipe(Effect.mapError(dispatchFailed("read sub-agent thread state"))); + if (Option.isNone(detail)) { + return yield* new SubAgentError({ + reason: "thread-not-found", + description: `Sub-agent thread ${threadId} no longer exists (it may have been deleted or archived).`, + }); + } + return detail.value; + }); + + const startTurn = Effect.fn("SubAgentCoordinator.startTurn")(function* ( + threadId: ThreadId, + prompt: string, + runtimeMode: RuntimeMode, + ) { + const createdAt = yield* nowIso; + const commandUuid = yield* randomUuid; + const messageUuid = yield* randomUuid; + yield* engine + .dispatch({ + type: "thread.turn.start", + commandId: CommandId.make(`server:sub-agent-turn:${commandUuid}`), + threadId, + message: { + messageId: MessageId.make(messageUuid), + role: "user", + text: prompt, + attachments: [], + }, + runtimeMode, + interactionMode: "default", + createdAt, + }) + .pipe(Effect.mapError(dispatchFailed("start sub-agent turn"))); + return createdAt; + }); + + const list: SubAgentCoordinatorShape["list"] = Effect.fn("SubAgentCoordinator.list")( + function* (scope) { + const providers = yield* providerRegistry.getProviders; + return { + providers: providers.map((provider) => ({ + instanceId: provider.instanceId, + driver: provider.driver, + ...(provider.displayName !== undefined ? { displayName: provider.displayName } : {}), + status: provider.status, + authStatus: provider.auth.status, + spawnable: isSpawnableProvider(provider), + models: provider.models.map((model) => model.slug), + isCaller: provider.instanceId === scope.providerInstanceId, + })), + }; + }, + ); + + const spawn: SubAgentCoordinatorShape["spawn"] = Effect.fn("SubAgentCoordinator.spawn")( + function* (scope, input) { + const callerDepth = (yield* SynchronizedRef.get(children)).get(scope.threadId)?.depth ?? 0; + if (callerDepth >= SUB_AGENT_MAX_SPAWN_DEPTH) { + return yield* new SubAgentError({ + reason: "depth-limit-exceeded", + description: `Sub-agents may only nest ${SUB_AGENT_MAX_SPAWN_DEPTH} levels deep; this session is already at depth ${callerDepth}. Do the work in this session instead.`, + }); + } + + const providers = yield* providerRegistry.getProviders; + const target = providers.find((provider) => provider.instanceId === input.providerInstanceId); + if (!target) { + return yield* new SubAgentError({ + reason: "provider-not-found", + description: `No provider instance "${input.providerInstanceId}" is configured. Call agent_list for valid instance ids.`, + }); + } + if (!isSpawnableProvider(target)) { + return yield* new SubAgentError({ + reason: "provider-not-spawnable", + description: `Provider instance "${target.instanceId}" (${target.driver}) is not ready (status: ${target.status}, auth: ${target.auth.status}). Call agent_list to pick a spawnable provider.`, + }); + } + const model = input.model ?? target.models[0]?.slug; + if (model === undefined) { + return yield* new SubAgentError({ + reason: "model-not-resolved", + description: `Provider instance "${target.instanceId}" reports no models; pass an explicit model slug.`, + }); + } + + const callerThread = yield* snapshotQuery + .getThreadShellById(scope.threadId) + .pipe(Effect.mapError(dispatchFailed("read calling thread"))); + if (Option.isNone(callerThread)) { + return yield* new SubAgentError({ + reason: "caller-thread-not-found", + description: + "The calling session's thread no longer exists; cannot place a sub-agent next to it.", + }); + } + const parent = callerThread.value; + + const createdAt = yield* nowIso; + const commandUuid = yield* randomUuid; + const threadUuid = yield* randomUuid; + const childThreadId = ThreadId.make(threadUuid); + const title = input.title ?? defaultTitleForPrompt(input.prompt); + + yield* engine + .dispatch({ + type: "thread.create", + commandId: CommandId.make(`server:sub-agent-spawn:${commandUuid}`), + threadId: childThreadId, + projectId: parent.projectId, + title, + modelSelection: { instanceId: target.instanceId, model }, + runtimeMode: parent.runtimeMode, + interactionMode: "default", + branch: parent.branch, + worktreePath: parent.worktreePath, + createdAt, + }) + .pipe(Effect.mapError(dispatchFailed("create sub-agent thread"))); + + const lastTurnRequestedAt = yield* startTurn(childThreadId, input.prompt, parent.runtimeMode); + + yield* SynchronizedRef.update(children, (current) => { + const next = new Map(current); + next.set(childThreadId, { + parentThreadId: scope.threadId, + depth: callerDepth + 1, + lastTurnRequestedAt, + }); + return next; + }); + + return { + threadId: childThreadId, + providerInstanceId: target.instanceId, + model, + title, + status: "running" as const, + }; + }, + ); + + const send: SubAgentCoordinatorShape["send"] = Effect.fn("SubAgentCoordinator.send")( + function* (scope, input) { + const record = yield* requireChildOfCaller(scope, input.threadId); + const thread = yield* readThreadDetail(input.threadId); + const lastTurnRequestedAt = yield* startTurn( + input.threadId, + input.prompt, + thread.runtimeMode, + ); + yield* SynchronizedRef.update(children, (current) => { + const next = new Map(current); + next.set(input.threadId, { ...record, lastTurnRequestedAt }); + return next; + }); + return { threadId: input.threadId, status: "running" as const }; + }, + ); + + const wait: SubAgentCoordinatorShape["wait"] = Effect.fn("SubAgentCoordinator.wait")( + function* (scope, input) { + const record = yield* requireChildOfCaller(scope, input.threadId); + const timeoutSeconds = input.timeoutSeconds ?? DEFAULT_WAIT_TIMEOUT_SECONDS; + const deadline = (yield* Clock.currentTimeMillis) + timeoutSeconds * 1_000; + + while (true) { + const thread = yield* readThreadDetail(input.threadId); + const status = turnStatus(thread, record.lastTurnRequestedAt); + if (status !== "running") { + return { + threadId: input.threadId, + status, + finalText: finalAssistantText(thread), + }; + } + if ((yield* Clock.currentTimeMillis) >= deadline) { + return { threadId: input.threadId, status: "running" as const, finalText: null }; + } + yield* Effect.sleep(Duration.millis(WAIT_POLL_INTERVAL_MILLIS)); + } + }, + ); + + return SubAgentCoordinator.of({ list, spawn, send, wait }); +}); + +export const SubAgentCoordinatorLive = Layer.effect(SubAgentCoordinator, makeSubAgentCoordinator); + +/** Exposed for tests. */ +export const __testing = { + make: makeSubAgentCoordinator, +}; diff --git a/apps/server/src/mcp/toolkits/agents/handlers.ts b/apps/server/src/mcp/toolkits/agents/handlers.ts new file mode 100644 index 00000000000..3bd425ded13 --- /dev/null +++ b/apps/server/src/mcp/toolkits/agents/handlers.ts @@ -0,0 +1,49 @@ +import { SubAgentError } from "@t3tools/contracts"; +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; + +import * as McpInvocationContext from "../../McpInvocationContext.ts"; +import { SubAgentCoordinator, SubAgentCoordinatorLive } from "./SubAgentCoordinator.ts"; +import { SubAgentToolkit } from "./tools.ts"; + +const requireAgentsScope = Effect.fn("SubAgentToolkit.requireAgentsScope")(function* () { + const invocation = yield* McpInvocationContext.McpInvocationContext; + if (!invocation.capabilities.has("agents")) { + return yield* new SubAgentError({ + reason: "capability-unavailable", + description: "This MCP credential does not grant the agents capability.", + }); + } + return invocation; +}); + +const SubAgentToolkitHandlers = SubAgentToolkit.toLayer({ + agent_list: () => + Effect.gen(function* () { + const scope = yield* requireAgentsScope(); + const coordinator = yield* SubAgentCoordinator; + return yield* coordinator.list(scope); + }), + agent_spawn: (input) => + Effect.gen(function* () { + const scope = yield* requireAgentsScope(); + const coordinator = yield* SubAgentCoordinator; + return yield* coordinator.spawn(scope, input); + }), + agent_send: (input) => + Effect.gen(function* () { + const scope = yield* requireAgentsScope(); + const coordinator = yield* SubAgentCoordinator; + return yield* coordinator.send(scope, input); + }), + agent_wait: (input) => + Effect.gen(function* () { + const scope = yield* requireAgentsScope(); + const coordinator = yield* SubAgentCoordinator; + return yield* coordinator.wait(scope, input); + }), +}); + +export const SubAgentToolkitHandlersLive = SubAgentToolkitHandlers.pipe( + Layer.provide(SubAgentCoordinatorLive), +); diff --git a/apps/server/src/mcp/toolkits/agents/tools.ts b/apps/server/src/mcp/toolkits/agents/tools.ts new file mode 100644 index 00000000000..55248978ab3 --- /dev/null +++ b/apps/server/src/mcp/toolkits/agents/tools.ts @@ -0,0 +1,73 @@ +import { + SubAgentError, + SubAgentListResult, + SubAgentSendInput, + SubAgentSendResult, + SubAgentSpawnInput, + SubAgentSpawnResult, + SubAgentWaitInput, + SubAgentWaitResult, +} from "@t3tools/contracts"; +import * as Schema from "effect/Schema"; +import { Tool, Toolkit } from "effect/unstable/ai"; + +import * as McpInvocationContext from "../../McpInvocationContext.ts"; +import { SubAgentCoordinator } from "./SubAgentCoordinator.ts"; + +const dependencies = [McpInvocationContext.McpInvocationContext, SubAgentCoordinator]; + +export const AgentListTool = Tool.make("agent_list", { + description: + "List the coding-agent providers configured on this server (e.g. Codex, Claude, Cursor) that can run sub-agent threads via agent_spawn. Shows each instance id, driver, readiness, available model slugs, and which instance is running the current session.", + parameters: Schema.Struct({}), + success: SubAgentListResult, + failure: SubAgentError, + dependencies, +}) + .annotate(Tool.Title, "List sub-agent providers") + .annotate(Tool.Readonly, true) + .annotate(Tool.Destructive, false) + .annotate(Tool.Idempotent, true); + +export const AgentSpawnTool = Tool.make("agent_spawn", { + description: + "Spawn a sub-agent: start a new thread on any configured provider instance (including a different agent than yourself — e.g. delegate from Claude to Codex or vice versa) and send it an initial prompt. The sub-agent works in this thread's project and worktree. Returns immediately with the new threadId; use agent_wait to collect the result and agent_send for follow-up prompts. Use agent_list first to pick a spawnable providerInstanceId.", + parameters: SubAgentSpawnInput, + success: SubAgentSpawnResult, + failure: SubAgentError, + dependencies, +}) + .annotate(Tool.Title, "Spawn sub-agent") + .annotate(Tool.OpenWorld, true) + .annotate(Tool.Destructive, true); + +export const AgentSendTool = Tool.make("agent_send", { + description: + "Send a follow-up prompt to a sub-agent thread previously created with agent_spawn. Returns immediately; use agent_wait to collect the response.", + parameters: SubAgentSendInput, + success: SubAgentSendResult, + failure: SubAgentError, + dependencies, +}) + .annotate(Tool.Title, "Prompt sub-agent") + .annotate(Tool.OpenWorld, true) + .annotate(Tool.Destructive, true); + +export const AgentWaitTool = Tool.make("agent_wait", { + description: + 'Wait for a sub-agent thread (created with agent_spawn) to finish its current turn and return the final assistant message. Blocks up to timeoutSeconds (default 60, max 600); a status of "running" means the sub-agent is still working — call agent_wait again to keep waiting.', + parameters: SubAgentWaitInput, + success: SubAgentWaitResult, + failure: SubAgentError, + dependencies, +}) + .annotate(Tool.Title, "Await sub-agent result") + .annotate(Tool.Readonly, true) + .annotate(Tool.Destructive, false); + +export const SubAgentToolkit = Toolkit.make( + AgentListTool, + AgentSpawnTool, + AgentSendTool, + AgentWaitTool, +); diff --git a/packages/contracts/src/index.ts b/packages/contracts/src/index.ts index 43270efdec7..dda17eb5afd 100644 --- a/packages/contracts/src/index.ts +++ b/packages/contracts/src/index.ts @@ -25,4 +25,5 @@ export * from "./assets.ts"; export * from "./review.ts"; export * from "./preview.ts"; export * from "./previewAutomation.ts"; +export * from "./subAgents.ts"; export * from "./rpc.ts"; diff --git a/packages/contracts/src/previewAutomation.ts b/packages/contracts/src/previewAutomation.ts index 6431dd0dcfd..25a37d425b8 100644 --- a/packages/contracts/src/previewAutomation.ts +++ b/packages/contracts/src/previewAutomation.ts @@ -608,7 +608,7 @@ export type PreviewAutomationResponse = typeof PreviewAutomationResponse.Type; export class PreviewAutomationUnavailableError extends Schema.TaggedErrorClass()( "PreviewAutomationUnavailableError", { - capability: Schema.Literal("preview"), + capability: Schema.Literals(["preview", "agents"]), environmentId: EnvironmentId, threadId: ThreadId, providerSessionId: TrimmedNonEmptyString, diff --git a/packages/contracts/src/subAgents.ts b/packages/contracts/src/subAgents.ts new file mode 100644 index 00000000000..8b72a77e72f --- /dev/null +++ b/packages/contracts/src/subAgents.ts @@ -0,0 +1,135 @@ +/** + * Sub-agent orchestration contracts. + * + * Schemas for the product-native MCP "agents" toolkit that lets a running + * provider session spawn and drive sibling threads on any configured + * provider instance — including a different driver than its own (e.g. a + * Claude session delegating a task to a Codex instance, or vice versa). + * + * Keep this module schema-only; the runtime lives in + * `apps/server/src/mcp/toolkits/agents/`. + */ +import * as Schema from "effect/Schema"; + +import { ThreadId, TrimmedNonEmptyString } from "./baseSchemas.ts"; +import { ProviderDriverKind, ProviderInstanceId } from "./providerInstance.ts"; +import { ServerProviderAuthStatus, ServerProviderState } from "./server.ts"; + +/** + * Maximum spawn nesting. A user-created thread sits at depth 0; each + * sub-agent thread is one level deeper. Spawning is refused once the + * caller is already at this depth, bounding recursive fan-out. + */ +export const SUB_AGENT_MAX_SPAWN_DEPTH = 2; + +export const SubAgentStatus = Schema.Literals(["running", "completed", "interrupted", "error"]); +export type SubAgentStatus = typeof SubAgentStatus.Type; + +export const SubAgentProviderSummary = Schema.Struct({ + instanceId: ProviderInstanceId, + driver: ProviderDriverKind, + displayName: Schema.optional(TrimmedNonEmptyString), + status: ServerProviderState, + authStatus: ServerProviderAuthStatus, + /** Whether `agent_spawn` currently accepts this instance as a target. */ + spawnable: Schema.Boolean, + /** Model slugs accepted by `agent_spawn`'s `model` input. */ + models: Schema.Array(TrimmedNonEmptyString), + /** True for the instance running the calling agent session. */ + isCaller: Schema.Boolean, +}); +export type SubAgentProviderSummary = typeof SubAgentProviderSummary.Type; + +export const SubAgentListResult = Schema.Struct({ + providers: Schema.Array(SubAgentProviderSummary), +}); +export type SubAgentListResult = typeof SubAgentListResult.Type; + +export const SubAgentSpawnInput = Schema.Struct({ + providerInstanceId: ProviderInstanceId.annotate({ + description: + "Target provider instance id from agent_list (may be a different agent than the caller, e.g. spawn Codex from Claude).", + }), + prompt: TrimmedNonEmptyString.annotate({ + description: + "Initial task prompt for the sub-agent. Include all context it needs; it does not see this conversation.", + }), + model: Schema.optional( + TrimmedNonEmptyString.annotate({ + description: "Model slug from agent_list. Defaults to the target provider's first model.", + }), + ), + title: Schema.optional( + TrimmedNonEmptyString.annotate({ + description: "Thread title shown in the UI. Defaults to the first line of the prompt.", + }), + ), +}); +export type SubAgentSpawnInput = typeof SubAgentSpawnInput.Type; + +export const SubAgentSpawnResult = Schema.Struct({ + threadId: ThreadId, + providerInstanceId: ProviderInstanceId, + model: TrimmedNonEmptyString, + title: TrimmedNonEmptyString, + status: SubAgentStatus, +}); +export type SubAgentSpawnResult = typeof SubAgentSpawnResult.Type; + +export const SubAgentSendInput = Schema.Struct({ + threadId: ThreadId.annotate({ + description: "Sub-agent thread id returned by agent_spawn in this session.", + }), + prompt: TrimmedNonEmptyString.annotate({ + description: "Follow-up prompt for the sub-agent's next turn.", + }), +}); +export type SubAgentSendInput = typeof SubAgentSendInput.Type; + +export const SubAgentSendResult = Schema.Struct({ + threadId: ThreadId, + status: SubAgentStatus, +}); +export type SubAgentSendResult = typeof SubAgentSendResult.Type; + +export const SubAgentWaitInput = Schema.Struct({ + threadId: ThreadId.annotate({ + description: "Sub-agent thread id returned by agent_spawn in this session.", + }), + timeoutSeconds: Schema.optional( + Schema.Int.check(Schema.isBetween({ minimum: 1, maximum: 600 })).annotate({ + description: + 'Maximum seconds to block for the turn to finish (default 60). On timeout the result status is "running"; call agent_wait again to keep waiting.', + }), + ), +}); +export type SubAgentWaitInput = typeof SubAgentWaitInput.Type; + +export const SubAgentWaitResult = Schema.Struct({ + threadId: ThreadId, + status: SubAgentStatus, + /** Final assistant message of the awaited turn; null while running or when unavailable. */ + finalText: Schema.NullOr(Schema.String), +}); +export type SubAgentWaitResult = typeof SubAgentWaitResult.Type; + +export const SubAgentErrorReason = Schema.Literals([ + "capability-unavailable", + "provider-not-found", + "provider-not-spawnable", + "model-not-resolved", + "caller-thread-not-found", + "thread-not-found", + "depth-limit-exceeded", + "dispatch-failed", +]); +export type SubAgentErrorReason = typeof SubAgentErrorReason.Type; + +export class SubAgentError extends Schema.TaggedErrorClass()("SubAgentError", { + reason: SubAgentErrorReason, + description: Schema.String, +}) { + override get message(): string { + return this.description; + } +}