From a11e15b0cee17ac8006e50439312aee95fda5b8c Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Mon, 8 Jun 2026 15:49:35 +0000 Subject: [PATCH 1/5] =?UTF-8?q?=F0=9F=A4=96=20perf:=20bulk=20reserve=20wor?= =?UTF-8?q?kflow=20agent=20tasks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement two-phase bulk task reservation and launch for parallel workflow agents, including starting-state persistence, scheduler coalescing, workflow bulk creation, task tool schema updates, and regression coverage.\n\n---\n\n_Generated with `mux` • Model: `openai:gpt-5.5` • Thinking: `xhigh` • Cost: `719969{MUX_COSTS_USD:-unknown}`_\n\n --- docs/hooks/tools.mdx | 10 +- .../utils/messages/modelMessageTransform.ts | 7 +- src/common/orpc/schemas/api.ts | 2 +- src/common/orpc/schemas/workspace.ts | 7 +- src/common/schemas/project.ts | 7 +- src/common/utils/agentTaskCompletion.ts | 9 +- src/common/utils/tools/toolDefinitions.ts | 24 +- src/node/config.ts | 5 + .../builtInSkillContent.generated.ts | 10 +- src/node/services/taskService.test.ts | 35 + src/node/services/taskService.ts | 1185 ++++++++++------- src/node/services/tools/task.ts | 10 +- src/node/services/tools/task_await.ts | 9 +- src/node/services/tools/task_list.test.ts | 2 +- src/node/services/tools/task_list.ts | 2 +- .../services/workflows/WorkflowRunner.test.ts | 53 +- src/node/services/workflows/WorkflowRunner.ts | 74 +- .../WorkflowTaskServiceAdapter.test.ts | 62 + .../workflows/WorkflowTaskServiceAdapter.ts | 84 ++ src/node/services/workspaceService.ts | 18 +- 20 files changed, 1079 insertions(+), 536 deletions(-) diff --git a/docs/hooks/tools.mdx b/docs/hooks/tools.mdx index 4898a63c46..9335da8e74 100644 --- a/docs/hooks/tools.mdx +++ b/docs/hooks/tools.mdx @@ -641,17 +641,17 @@ If a value is too large for the environment, it may be omitted (not set). Mux al | `MUX_TOOL_INPUT_MIN_COMPLETED` | `min_completed` | number | Number of awaited tasks that must complete before this call returns. Defaults to 1, so by default task_await returns as soon as the FIRST awaited task completes, letting you act on it while the rest keep running. The result still includes every task complete at that moment plus current status (running/queued) for the rest. Tasks that have not yet completed keep running and remain re-awaitable on a later task_await call. Raise this (e.g. set it to the total number of awaited tasks) when you genuinely need more before proceeding — for example best-of-N synthesis that must compare every candidate. Clamped to the number of awaited tasks; values above that behave like 'wait for all'. | | `MUX_TOOL_INPUT_TASK_IDS_` | `task_ids[]` | string | List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and workflow runs of the current workspace, excluding workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs. | | `MUX_TOOL_INPUT_TASK_IDS_COUNT` | `task_ids.length` | number | Number of elements in task_ids (List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and workflow runs of the current workspace, excluding workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs.) | -| `MUX_TOOL_INPUT_TIMEOUT_SECS` | `timeout_secs` | number | Maximum time to wait in seconds for each task. For bash tasks, this waits for NEW output (or process exit). If exceeded, the result returns status=queued\|running\|awaiting_report (task is still active). Defaults to 600 seconds (10 minutes) if not specified. Set to 0 for a non-blocking status check. | +| `MUX_TOOL_INPUT_TIMEOUT_SECS` | `timeout_secs` | number | Maximum time to wait in seconds for each task. For bash tasks, this waits for NEW output (or process exit). If exceeded, the result returns status=queued\|starting\|running\|awaiting_report (task is still active). Defaults to 600 seconds (10 minutes) if not specified. Set to 0 for a non-blocking status check. |
task_list (2) -| Env var | JSON path | Type | Description | -| --------------------------------- | ------------------- | ------ | ---------------------------------------------------------------------------------------------------------------------- | -| `MUX_TOOL_INPUT_STATUSES_` | `statuses[]` | enum | Task statuses to include. Defaults to active tasks: queued, running, awaiting_report. | -| `MUX_TOOL_INPUT_STATUSES_COUNT` | `statuses.length` | number | Number of elements in statuses (Task statuses to include. Defaults to active tasks: queued, running, awaiting_report.) | +| Env var | JSON path | Type | Description | +| --------------------------------- | ------------------- | ------ | -------------------------------------------------------------------------------------------------------------------------------- | +| `MUX_TOOL_INPUT_STATUSES_` | `statuses[]` | enum | Task statuses to include. Defaults to active tasks: queued, starting, running, awaiting_report. | +| `MUX_TOOL_INPUT_STATUSES_COUNT` | `statuses.length` | number | Number of elements in statuses (Task statuses to include. Defaults to active tasks: queued, starting, running, awaiting_report.) |
diff --git a/src/browser/utils/messages/modelMessageTransform.ts b/src/browser/utils/messages/modelMessageTransform.ts index bff91fa19e..dc8171ac20 100644 --- a/src/browser/utils/messages/modelMessageTransform.ts +++ b/src/browser/utils/messages/modelMessageTransform.ts @@ -711,7 +711,12 @@ function coalesceConsecutiveNoProgressTaskAwaitPairs(messages: ModelMessage[]): } const status = (entry as { status?: unknown }).status; - if (status !== "queued" && status !== "running" && status !== "awaiting_report") { + if ( + status !== "queued" && + status !== "starting" && + status !== "running" && + status !== "awaiting_report" + ) { return false; } diff --git a/src/common/orpc/schemas/api.ts b/src/common/orpc/schemas/api.ts index f0864fb670..18a57fbbae 100644 --- a/src/common/orpc/schemas/api.ts +++ b/src/common/orpc/schemas/api.ts @@ -1657,7 +1657,7 @@ export const tasks = { z.object({ taskId: z.string(), kind: z.literal("agent"), - status: z.enum(["queued", "running"]), + status: z.enum(["queued", "starting", "running"]), }), z.string() ), diff --git a/src/common/orpc/schemas/workspace.ts b/src/common/orpc/schemas/workspace.ts index 6951eea678..9ab6080c92 100644 --- a/src/common/orpc/schemas/workspace.ts +++ b/src/common/orpc/schemas/workspace.ts @@ -159,12 +159,15 @@ export const WorkspaceMetadataSchema = z.object({ description: "Grouping metadata for child tasks spawned from the same parent tool call.", }), taskStatus: z - .enum(["queued", "running", "awaiting_report", "interrupted", "reported"]) + .enum(["queued", "starting", "running", "awaiting_report", "interrupted", "reported"]) .optional() .meta({ description: - "Agent task lifecycle status for child workspaces (queued|running|awaiting_report|interrupted|reported).", + "Agent task lifecycle status for child workspaces (queued|starting|running|awaiting_report|interrupted|reported).", }), + taskLaunchError: z.string().optional().meta({ + description: "Startup failure recorded before an agent task could begin streaming.", + }), reportedAt: z.string().optional().meta({ description: "ISO 8601 timestamp for when an agent task reported completion (optional).", }), diff --git a/src/common/schemas/project.ts b/src/common/schemas/project.ts index a9437bd92e..507d014c6e 100644 --- a/src/common/schemas/project.ts +++ b/src/common/schemas/project.ts @@ -122,12 +122,15 @@ export const WorkspaceConfigSchema = z.object({ description: "Grouping metadata for child tasks spawned from the same parent tool call.", }), taskStatus: z - .enum(["queued", "running", "awaiting_report", "interrupted", "reported"]) + .enum(["queued", "starting", "running", "awaiting_report", "interrupted", "reported"]) .optional() .meta({ description: - "Agent task lifecycle status for child workspaces (queued|running|awaiting_report|interrupted|reported).", + "Agent task lifecycle status for child workspaces (queued|starting|running|awaiting_report|interrupted|reported).", }), + taskLaunchError: z.string().optional().meta({ + description: "Startup failure recorded before an agent task could begin streaming.", + }), reportedAt: z.string().optional().meta({ description: "ISO 8601 timestamp for when an agent task reported completion (optional).", }), diff --git a/src/common/utils/agentTaskCompletion.ts b/src/common/utils/agentTaskCompletion.ts index 0d3cfdaa23..0f76651866 100644 --- a/src/common/utils/agentTaskCompletion.ts +++ b/src/common/utils/agentTaskCompletion.ts @@ -1,5 +1,12 @@ export interface AgentTaskCompletionCandidate { - taskStatus?: "queued" | "running" | "awaiting_report" | "interrupted" | "reported" | null; + taskStatus?: + | "queued" + | "starting" + | "running" + | "awaiting_report" + | "interrupted" + | "reported" + | null; reportedAt?: string | null; } diff --git a/src/common/utils/tools/toolDefinitions.ts b/src/common/utils/tools/toolDefinitions.ts index 70f204f5ed..aa19c70255 100644 --- a/src/common/utils/tools/toolDefinitions.ts +++ b/src/common/utils/tools/toolDefinitions.ts @@ -247,8 +247,8 @@ export function buildTaskToolDescription(runtimeMode: RuntimeMode | undefined): "\n\nWhen delegating, include a compact task brief (Task / Background / Scope / Starting points / Acceptance / Deliverables / Constraints). " + "Avoid telling the sub-agent to read your plan file; child workspaces do not automatically have access to it. " + "\n\nIf run_in_background is false, waits for the sub-agent to finish and returns the completed report. When grouped sibling tasks are requested via n or variants, the completed result includes one report per spawned task. " + - "If the foreground wait times out, returns queued/running task metadata with a note (the task continues running); use task_await to monitor progress. " + - "If run_in_background is true, returns immediately with queued/running task metadata; use task_await to wait for completion, task_list to rediscover active tasks, and task_terminate to stop it. " + + "If the foreground wait times out, returns queued/starting/running task metadata with a note (the task continues running); use task_await to monitor progress. " + + "If run_in_background is true, returns immediately with queued/starting/running task metadata; use task_await to wait for completion, task_list to rediscover active tasks, and task_terminate to stop it. " + "Prefer run_in_background: false when spawning a single task — it is equivalent to spawning background + immediately awaiting, but saves a round-trip. " + "Use run_in_background: true when launching multiple tasks in parallel so you can act on each as it completes via task_await (which returns on the first completion by default); a foreground grouped spawn (run_in_background: false) instead blocks until every sibling finishes and returns all reports at once. " + "Do not call task_await in the same parallel tool-call batch; wait for the returned task metadata first. " + @@ -337,7 +337,7 @@ export const TaskToolArgsSchema = TaskToolAgentArgsSchema; const TaskToolSpawnedTaskSchema = z .object({ taskId: z.string(), - status: z.enum(["queued", "running", "completed", "interrupted"]), + status: z.enum(["queued", "starting", "running", "completed", "interrupted"]), groupKind: z.enum(TASK_GROUP_KIND_VALUES).optional(), label: z.string().optional(), }) @@ -358,7 +358,7 @@ const TaskToolCompletedReportSchema = z export const TaskToolQueuedResultSchema = z .object({ - status: z.enum(["queued", "running"]), + status: z.enum(["queued", "starting", "running"]), taskId: z.string().optional(), taskIds: z.array(z.string()).min(1).optional(), tasks: z.array(TaskToolSpawnedTaskSchema).min(1).optional(), @@ -473,7 +473,7 @@ export const TaskAwaitToolArgsSchema = z .describe( "Maximum time to wait in seconds for each task. " + "For bash tasks, this waits for NEW output (or process exit). " + - "If exceeded, the result returns status=queued|running|awaiting_report (task is still active). " + + "If exceeded, the result returns status=queued|starting|running|awaiting_report (task is still active). " + "Defaults to 600 seconds (10 minutes) if not specified. " + "Set to 0 for a non-blocking status check." ), @@ -568,7 +568,14 @@ export const TaskAwaitToolCompletedResultSchema = z export const TaskAwaitToolActiveResultSchema = z .object({ - status: z.enum(["queued", "running", "backgrounded", "awaiting_report", "interrupted"]), + status: z.enum([ + "queued", + "starting", + "running", + "backgrounded", + "awaiting_report", + "interrupted", + ]), taskId: z.string(), output: z.string().optional(), elapsed_ms: z.number().optional(), @@ -770,6 +777,7 @@ export const TaskTerminateToolResultSchema = z const TaskListStatusSchema = z.enum([ "queued", + "starting", "running", "awaiting_report", "interrupted", @@ -783,7 +791,7 @@ export const TaskListToolArgsSchema = z .array(TaskListStatusSchema) .nullish() .describe( - "Task statuses to include. Defaults to active tasks: queued, running, awaiting_report." + "Task statuses to include. Defaults to active tasks: queued, starting, running, awaiting_report." ), }) .strict(); @@ -1568,7 +1576,7 @@ export const TOOL_DEFINITIONS = { "Set min_completed higher (up to the number of awaited tasks) when you genuinely need more before proceeding — e.g. best-of-N synthesis that must compare every candidate should pass min_completed equal to the batch size. " + "The result always includes every task complete at the moment it returns, plus current status for the rest; not-yet-completed tasks keep running and stay re-awaitable on a later call. " + "You always get per-task results (like Promise.allSettled), just possibly before every task has finished. " + - "Possible statuses: completed, queued, running, backgrounded, awaiting_report, interrupted, not_found, invalid_scope, error. " + + "Possible statuses: completed, queued, starting, running, backgrounded, awaiting_report, interrupted, not_found, invalid_scope, error. " + "Bash task outputs may be automatically filtered; when this happens, check each result's note for details and (if available) where the full output was saved.", schema: TaskAwaitToolArgsSchema, }, diff --git a/src/node/config.ts b/src/node/config.ts index c05fa53ea2..f6f6825b0b 100644 --- a/src/node/config.ts +++ b/src/node/config.ts @@ -1577,6 +1577,7 @@ export class Config { workflowTask: workspace.workflowTask, bestOf: workspace.bestOf, taskStatus: workspace.taskStatus, + taskLaunchError: workspace.taskLaunchError, reportedAt: workspace.reportedAt, taskModelString: workspace.taskModelString, taskThinkingLevel: workspace.taskThinkingLevel, @@ -1676,6 +1677,7 @@ export class Config { metadata.workflowTask ??= workspace.workflowTask; metadata.bestOf ??= workspace.bestOf; metadata.taskStatus ??= workspace.taskStatus; + metadata.taskLaunchError ??= workspace.taskLaunchError; metadata.reportedAt ??= workspace.reportedAt; metadata.taskModelString ??= workspace.taskModelString; metadata.taskThinkingLevel ??= workspace.taskThinkingLevel; @@ -1746,6 +1748,7 @@ export class Config { workflowTask: workspace.workflowTask, bestOf: workspace.bestOf, taskStatus: workspace.taskStatus, + taskLaunchError: workspace.taskLaunchError, reportedAt: workspace.reportedAt, taskModelString: workspace.taskModelString, taskThinkingLevel: workspace.taskThinkingLevel, @@ -1798,6 +1801,7 @@ export class Config { workflowTask: workspace.workflowTask, bestOf: workspace.bestOf, taskStatus: workspace.taskStatus, + taskLaunchError: workspace.taskLaunchError, reportedAt: workspace.reportedAt, taskModelString: workspace.taskModelString, taskThinkingLevel: workspace.taskThinkingLevel, @@ -1868,6 +1872,7 @@ export class Config { workflowTask: metadata.workflowTask, bestOf: metadata.bestOf, taskStatus: metadata.taskStatus, + taskLaunchError: metadata.taskLaunchError, reportedAt: metadata.reportedAt, taskModelString: metadata.taskModelString, taskThinkingLevel: metadata.taskThinkingLevel, diff --git a/src/node/services/agentSkills/builtInSkillContent.generated.ts b/src/node/services/agentSkills/builtInSkillContent.generated.ts index 4b0ecc2ca1..796f3f4b20 100644 --- a/src/node/services/agentSkills/builtInSkillContent.generated.ts +++ b/src/node/services/agentSkills/builtInSkillContent.generated.ts @@ -4443,17 +4443,17 @@ export const BUILTIN_SKILL_FILES: Record> = { "| `MUX_TOOL_INPUT_MIN_COMPLETED` | `min_completed` | number | Number of awaited tasks that must complete before this call returns. Defaults to 1, so by default task_await returns as soon as the FIRST awaited task completes, letting you act on it while the rest keep running. The result still includes every task complete at that moment plus current status (running/queued) for the rest. Tasks that have not yet completed keep running and remain re-awaitable on a later task_await call. Raise this (e.g. set it to the total number of awaited tasks) when you genuinely need more before proceeding — for example best-of-N synthesis that must compare every candidate. Clamped to the number of awaited tasks; values above that behave like 'wait for all'. |", "| `MUX_TOOL_INPUT_TASK_IDS_` | `task_ids[]` | string | List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and workflow runs of the current workspace, excluding workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs. |", "| `MUX_TOOL_INPUT_TASK_IDS_COUNT` | `task_ids.length` | number | Number of elements in task_ids (List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and workflow runs of the current workspace, excluding workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs.) |", - "| `MUX_TOOL_INPUT_TIMEOUT_SECS` | `timeout_secs` | number | Maximum time to wait in seconds for each task. For bash tasks, this waits for NEW output (or process exit). If exceeded, the result returns status=queued\\|running\\|awaiting_report (task is still active). Defaults to 600 seconds (10 minutes) if not specified. Set to 0 for a non-blocking status check. |", + "| `MUX_TOOL_INPUT_TIMEOUT_SECS` | `timeout_secs` | number | Maximum time to wait in seconds for each task. For bash tasks, this waits for NEW output (or process exit). If exceeded, the result returns status=queued\\|starting\\|running\\|awaiting_report (task is still active). Defaults to 600 seconds (10 minutes) if not specified. Set to 0 for a non-blocking status check. |", "", "", "", "
", "task_list (2)", "", - "| Env var | JSON path | Type | Description |", - "| --------------------------------- | ------------------- | ------ | ---------------------------------------------------------------------------------------------------------------------- |", - "| `MUX_TOOL_INPUT_STATUSES_` | `statuses[]` | enum | Task statuses to include. Defaults to active tasks: queued, running, awaiting_report. |", - "| `MUX_TOOL_INPUT_STATUSES_COUNT` | `statuses.length` | number | Number of elements in statuses (Task statuses to include. Defaults to active tasks: queued, running, awaiting_report.) |", + "| Env var | JSON path | Type | Description |", + "| --------------------------------- | ------------------- | ------ | -------------------------------------------------------------------------------------------------------------------------------- |", + "| `MUX_TOOL_INPUT_STATUSES_` | `statuses[]` | enum | Task statuses to include. Defaults to active tasks: queued, starting, running, awaiting_report. |", + "| `MUX_TOOL_INPUT_STATUSES_COUNT` | `statuses.length` | number | Number of elements in statuses (Task statuses to include. Defaults to active tasks: queued, starting, running, awaiting_report.) |", "", "
", "", diff --git a/src/node/services/taskService.test.ts b/src/node/services/taskService.test.ts index f3805a818d..efc078e021 100644 --- a/src/node/services/taskService.test.ts +++ b/src/node/services/taskService.test.ts @@ -512,6 +512,41 @@ describe("TaskService", () => { } }, 20_000); + test("createMany reserves admitted tasks as starting and over-capacity tasks as queued", async () => { + const config = await createTestConfig(rootDir); + stubStableIds(config, ["aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc"], "dddddddddd"); + + const { parentId } = await saveLocalParentWorkspace(config, rootDir); + await config.editConfig((cfg) => { + cfg.taskSettings = { maxParallelAgentTasks: 2, maxTaskNestingDepth: 3 }; + return cfg; + }); + + const sendMessage = mock(() => new Promise>(() => undefined)); + const { workspaceService } = createWorkspaceServiceMocks({ sendMessage }); + const { taskService } = createTaskServiceHarness(config, { workspaceService }); + + const result = await taskService.createMany( + ["one", "two", "three"].map((prompt, index) => ({ + parentWorkspaceId: parentId, + kind: "agent" as const, + agentId: "explore", + prompt, + title: `Task ${index + 1}`, + })) + ); + + expect(result.success).toBe(true); + if (!result.success) return; + expect(result.data.map((task) => task.status)).toEqual(["starting", "starting", "queued"]); + + const tasks = Array.from(config.loadConfigOrDefault().projects.values()) + .flatMap((project) => project.workspaces) + .filter((workspace) => workspace.parentWorkspaceId === parentId); + expect(tasks.map((task) => task.taskStatus)).toEqual(["starting", "starting", "queued"]); + expect(sendMessage).not.toHaveBeenCalled(); + }); + test("queues tasks when maxParallelAgentTasks is reached and starts them when a slot frees", async () => { const config = await createTestConfig(rootDir); stubStableIds(config, ["aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc", "dddddddddd"], "eeeeeeeeee"); diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index a71614fdfc..1b12d7a34c 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -275,7 +275,29 @@ function getTaskCompletionInstruction(params: { export interface TaskCreateResult { taskId: string; kind: TaskKind; - status: "queued" | "running"; + status: "queued" | "starting" | "running"; +} + +interface TaskLaunchPlan { + taskId: string; + parentWorkspaceId: string; + parentMeta: WorkspaceMetadata; + agentId: string; + agentType: string; + prompt: string; + title: string; + workspaceName: string; + createdAt: string; + taskRuntimeConfig: RuntimeConfig; + parentRuntimeConfig: RuntimeConfig; + taskModelString: string; + canonicalModel: string; + effectiveThinkingLevel?: ThinkingLevel; + skipInitHook: boolean; + preferredTrunkBranch?: string; + workflowTask?: TaskCreateArgs["workflowTask"]; + bestOf?: TaskCreateArgs["bestOf"]; + experiments?: TaskCreateArgs["experiments"]; } export interface TerminateAgentTaskResult { @@ -438,9 +460,9 @@ function buildBackgroundAwaitPrompt(params: { return ( `You have active background ${targetLabels.join(" and ")}. ` + - "You MUST NOT end your turn while any listed sub-agent tasks are queued/running/awaiting_report or workflow runs are pending/running/backgrounded. " + + "You MUST NOT end your turn while any listed sub-agent tasks are queued/starting/running/awaiting_report or workflow runs are pending/running/backgrounded. " + `Call task_await now with task_ids: ${JSON.stringify(taskIds)} to wait for them. ` + - "If any are still queued/running/awaiting_report/backgrounded after that, call task_await again. " + + "If any are still queued/starting/running/awaiting_report/backgrounded after that, call task_await again. " + "Only once all listed work is terminal should you write your final response, integrating any reports or workflow results." ); } @@ -694,6 +716,8 @@ export class TaskService { // safe even when the parent stream-end already holds workspaceEventLocks for the parent itself. private readonly deferredBestOfLocks = new MutexMap(); private readonly mutex = new AsyncMutex(); + private maybeStartQueuedTasksInFlight: Promise | undefined; + private readonly reservedTaskLaunchByProjectPath = new Map>(); private readonly pendingWaitersByTaskId = new Map(); private readonly pendingStartWaitersByTaskId = new Map(); // Tracks workspaces currently blocked in a foreground wait (e.g. a task tool call awaiting @@ -1233,6 +1257,24 @@ export class TaskService { queuedTaskCountAtStartup, }); + const staleStartingTasks = this.listAgentTaskWorkspaces(startupConfig).filter( + (task) => task.taskStatus === "starting" && typeof task.id === "string" + ); + if (staleStartingTasks.length > 0) { + await this.config.editConfig((config) => { + for (const task of staleStartingTasks) { + assert(task.id != null && task.id.length > 0, "stale starting task id is required"); + const entry = findWorkspaceEntry(config, task.id); + if (!entry) continue; + entry.workspace.taskStatus = this.aiService.isStreaming(task.id) ? "running" : "queued"; + } + return config; + }); + log.info("[startup] Recovered stale starting agent tasks", { + count: staleStartingTasks.length, + }); + } + const maybeStartQueuedTasksStartedAt = Date.now(); await this.maybeStartQueuedTasks(); const maybeStartQueuedTasksMs = Date.now() - maybeStartQueuedTasksStartedAt; @@ -1434,6 +1476,450 @@ export class TaskService { }; } + async createMany(argsList: TaskCreateArgs[]): Promise> { + if (argsList.length === 0) { + return Ok([]); + } + + const plans: Array = []; + const results: TaskCreateResult[] = []; + + await using _lock = await this.mutex.acquire(); + + const cfg = this.config.loadConfigOrDefault(); + const taskSettings = cfg.taskSettings ?? DEFAULT_TASK_SETTINGS; + let reservedActiveCount = this.countActiveAgentTasks(cfg); + + for (const args of argsList) { + const parentWorkspaceId = coerceNonEmptyString(args.parentWorkspaceId); + if (!parentWorkspaceId) return Err("Task.createMany: parentWorkspaceId is required"); + if (args.kind !== "agent") return Err("Task.createMany: unsupported kind"); + + const basePrompt = coerceNonEmptyString(args.prompt); + if (!basePrompt) return Err("Task.createMany: prompt is required"); + const prompt = + args.experiments?.subagentFileReports === true + ? appendSubagentFileReportInstructions(basePrompt, args.workflowTask) + : basePrompt; + + const normalizedAgentId = normalizeAgentId(args.agentId ?? args.agentType, ""); + if (!normalizedAgentId) return Err("Task.createMany: agentId is required"); + const parsedAgentId = AgentIdSchema.safeParse(normalizedAgentId); + if (!parsedAgentId.success) { + return Err(`Task.createMany: invalid agentId (${normalizedAgentId})`); + } + const agentId = parsedAgentId.data; + const agentType = agentId; + + let normalizedBestOf: TaskCreateArgs["bestOf"]; + const bestOf = args.bestOf; + if (bestOf) { + const groupId = coerceNonEmptyString(bestOf.groupId); + if (!groupId) + return Err("Task.createMany: bestOf.groupId is required when bestOf is provided"); + if (!Number.isInteger(bestOf.index) || bestOf.index < 0) { + return Err("Task.createMany: bestOf.index must be a non-negative integer"); + } + if (!Number.isInteger(bestOf.total) || bestOf.total < 2) { + return Err("Task.createMany: bestOf.total must be an integer >= 2"); + } + if (bestOf.index >= bestOf.total) { + return Err("Task.createMany: bestOf.index must be less than bestOf.total"); + } + const kind = normalizeTaskGroupKind(bestOf.kind); + const label = normalizeTaskGroupLabel(bestOf.label); + if (kind === TASK_GROUP_KIND.VARIANTS && !label) { + return Err("Task.createMany: bestOf.label is required when bestOf.kind is variants"); + } + if (kind !== TASK_GROUP_KIND.VARIANTS && label) { + return Err("Task.createMany: bestOf.label is only allowed when bestOf.kind is variants"); + } + normalizedBestOf = { + groupId, + index: bestOf.index, + total: bestOf.total, + kind, + ...(label ? { label } : {}), + }; + } + + const parentMetaResult = await this.aiService.getWorkspaceMetadata(parentWorkspaceId); + if (!parentMetaResult.success) { + return Err(`Task.createMany: parent workspace not found (${parentMetaResult.error})`); + } + const parentMeta = parentMetaResult.data; + + const taskProjectConfig = cfg.projects.get(stripTrailingSlashes(parentMeta.projectPath)); + if (!taskProjectConfig?.trusted) { + return Err( + "This project must be trusted before creating workspaces. Trust the project in Settings → Security, or create a workspace from the project page." + ); + } + + const parentEntry = findWorkspaceEntry(cfg, parentWorkspaceId); + if (parentEntry?.workspace.taskStatus === "reported") { + return Err("Task.createMany: cannot spawn new tasks after agent_report"); + } + + const requestedDepth = this.getTaskDepth(cfg, parentWorkspaceId) + 1; + if (requestedDepth > taskSettings.maxTaskNestingDepth) { + return Err( + `Task.createMany: maxTaskNestingDepth exceeded (requestedDepth=${requestedDepth}, max=${taskSettings.maxTaskNestingDepth})` + ); + } + + const taskId = this.config.generateStableId(); + const workspaceName = buildAgentWorkspaceName(agentId, taskId); + const nameValidation = validateWorkspaceName(workspaceName); + if (!nameValidation.valid) { + return Err( + `Task.createMany: generated workspace name invalid (${nameValidation.error ?? "unknown error"})` + ); + } + + const { taskModelString, canonicalModel, effectiveThinkingLevel } = + this.resolveTaskAISettings({ + cfg, + parentMeta, + agentId, + modelString: args.modelString, + thinkingLevel: args.thinkingLevel, + parentRuntimeAiSettings: args.parentRuntimeAiSettings, + }); + + const parentRuntimeConfig = parentMeta.runtimeConfig; + const taskRuntimeConfig: RuntimeConfig = parentRuntimeConfig; + const runtime = createRuntimeForWorkspace({ + runtimeConfig: taskRuntimeConfig, + projectPath: parentMeta.projectPath, + name: parentMeta.name, + }); + const isInPlace = parentMeta.projectPath === parentMeta.name; + const parentWorkspacePath = isInPlace + ? parentMeta.projectPath + : runtime.getWorkspacePath(parentMeta.projectPath, parentMeta.name); + + const getRunnableHint = async (): Promise => { + try { + const allAgents = await discoverAgentDefinitions(runtime, parentWorkspacePath); + const runnableIds = ( + await Promise.all( + allAgents.map(async (agent) => { + try { + const frontmatter = await resolveAgentFrontmatter( + runtime, + parentWorkspacePath, + agent.id, + { skipScopesAbove: getSkipScopesAboveForKnownScope(agent.scope) } + ); + if (frontmatter.subagent?.runnable !== true) return null; + return isAgentEffectivelyDisabled({ + cfg, + agentId: agent.id, + resolvedFrontmatter: frontmatter, + }) + ? null + : agent.id; + } catch { + return null; + } + }) + ) + ).filter((id): id is string => typeof id === "string"); + return runnableIds.length > 0 + ? `Runnable agentIds: ${runnableIds.join(", ")}` + : "No runnable agents available"; + } catch { + return "Could not discover available agents"; + } + }; + + let skipInitHook = false; + try { + const frontmatter = await resolveAgentFrontmatter(runtime, parentWorkspacePath, agentId); + if (frontmatter.subagent?.runnable !== true) { + const hint = await getRunnableHint(); + return Err( + `Task.createMany: agentId is not runnable as a sub-agent (${agentId}). ${hint}` + ); + } + if (isAgentEffectivelyDisabled({ cfg, agentId, resolvedFrontmatter: frontmatter })) { + const hint = await getRunnableHint(); + return Err(`Task.createMany: agentId is disabled (${agentId}). ${hint}`); + } + skipInitHook = frontmatter.subagent?.skip_init_hook === true; + } catch { + const hint = await getRunnableHint(); + return Err(`Task.createMany: unknown agentId (${agentId}). ${hint}`); + } + + const status: "queued" | "starting" = + reservedActiveCount >= taskSettings.maxParallelAgentTasks ? "queued" : "starting"; + if (status === "starting") reservedActiveCount += 1; + + const createdAt = getIsoNow(); + plans.push({ + taskId, + parentWorkspaceId, + parentMeta, + agentId, + agentType, + prompt, + title: args.title, + workspaceName, + createdAt, + taskRuntimeConfig, + parentRuntimeConfig, + taskModelString, + canonicalModel, + effectiveThinkingLevel, + skipInitHook, + workflowTask: args.workflowTask, + bestOf: normalizedBestOf, + experiments: args.experiments, + status, + }); + results.push({ taskId, kind: "agent", status }); + } + + await this.config.editConfig((config) => { + for (const plan of plans) { + const runtime = createRuntimeForWorkspace({ + runtimeConfig: plan.taskRuntimeConfig, + projectPath: plan.parentMeta.projectPath, + name: plan.parentMeta.name, + }); + const workspacePath = runtime.getWorkspacePath( + plan.parentMeta.projectPath, + plan.workspaceName + ); + const trunkBranch = coerceNonEmptyString(plan.parentMeta.name); + if (!trunkBranch) { + throw new Error("Task.createMany: parent workspace name missing"); + } + let projectConfig = config.projects.get(plan.parentMeta.projectPath); + if (!projectConfig) { + projectConfig = { workspaces: [] }; + config.projects.set(plan.parentMeta.projectPath, projectConfig); + } + projectConfig.workspaces.push({ + path: workspacePath, + id: plan.taskId, + name: plan.workspaceName, + title: plan.title, + createdAt: plan.createdAt, + runtimeConfig: plan.taskRuntimeConfig, + aiSettings: + plan.effectiveThinkingLevel !== undefined + ? { model: plan.canonicalModel, thinkingLevel: plan.effectiveThinkingLevel } + : undefined, + parentWorkspaceId: plan.parentWorkspaceId, + agentId: plan.agentId, + agentType: plan.agentType, + workflowTask: plan.workflowTask, + bestOf: plan.bestOf, + taskStatus: plan.status, + taskPrompt: plan.prompt, + taskTrunkBranch: trunkBranch, + taskModelString: plan.taskModelString, + taskThinkingLevel: plan.effectiveThinkingLevel, + taskExperiments: plan.experiments, + projects: plan.parentMeta.projects, + }); + } + return config; + }); + + for (const result of results) { + await this.emitWorkspaceMetadata(result.taskId); + } + for (const plan of plans) { + if (plan.status === "starting") { + this.scheduleReservedTaskLaunch(plan); + } + } + if (plans.some((plan) => plan.status === "queued")) { + this.scheduleMaybeStartQueuedTasks(); + } + + return Ok(results); + } + + private scheduleReservedTaskLaunch(plan: TaskLaunchPlan): void { + assert(plan.taskId.length > 0, "scheduleReservedTaskLaunch requires taskId"); + void this.enqueueReservedTaskLaunch(plan).catch((error: unknown) => { + log.error("Failed to launch reserved task", { taskId: plan.taskId, error }); + void this.markTaskLaunchFailed(plan.taskId, getErrorMessage(error)); + }); + } + + private scheduleMaybeStartQueuedTasks(): void { + void this.maybeStartQueuedTasks().catch((error: unknown) => { + log.error("TaskService.maybeStartQueuedTasks failed", { error }); + }); + } + + private async markTaskLaunchFailed(taskId: string, message: string): Promise { + assert(taskId.length > 0, "markTaskLaunchFailed requires taskId"); + await this.editWorkspaceEntry( + taskId, + (ws) => { + ws.taskStatus = "interrupted"; + ws.taskLaunchError = message; + }, + { allowMissing: true } + ); + await this.emitWorkspaceMetadata(taskId); + this.rejectWaiters(taskId, new Error(message)); + this.scheduleMaybeStartQueuedTasks(); + } + + private async startReservedAgentTask(plan: TaskLaunchPlan): Promise { + assert(plan.taskId.length > 0, "startReservedAgentTask requires taskId"); + assert(plan.parentWorkspaceId.length > 0, "startReservedAgentTask requires parentWorkspaceId"); + assert(plan.prompt.length > 0, "startReservedAgentTask requires prompt"); + + const entryAtStart = findWorkspaceEntry(this.config.loadConfigOrDefault(), plan.taskId); + if (entryAtStart?.workspace.taskStatus !== "starting") { + return; + } + + const initLogger = this.startWorkspaceInit(plan.taskId, plan.parentMeta.projectPath); + const runtime = createRuntimeForWorkspace({ + runtimeConfig: plan.taskRuntimeConfig, + projectPath: plan.parentMeta.projectPath, + name: plan.parentMeta.name, + }); + + const forkResult = await orchestrateFork({ + sourceRuntime: runtime, + projectPath: plan.parentMeta.projectPath, + sourceWorkspaceName: plan.parentMeta.name, + newWorkspaceName: plan.workspaceName, + initLogger, + config: this.config, + sourceWorkspaceId: plan.parentWorkspaceId, + sourceRuntimeConfig: plan.parentRuntimeConfig, + parentMetadata: plan.parentMeta, + allowCreateFallback: true, + ...(plan.preferredTrunkBranch != null + ? { preferredTrunkBranch: plan.preferredTrunkBranch } + : {}), + trusted: + this.config + .loadConfigOrDefault() + .projects.get(stripTrailingSlashes(plan.parentMeta.projectPath))?.trusted ?? false, + multiProjectExperimentEnabled: this.workspaceService.isExperimentEnabled( + EXPERIMENT_IDS.MULTI_PROJECT_WORKSPACES + ), + }); + + if (forkResult.success && forkResult.data.sourceRuntimeConfigUpdate) { + await this.config.updateWorkspaceMetadata(plan.parentWorkspaceId, { + runtimeConfig: forkResult.data.sourceRuntimeConfigUpdate, + }); + await this.emitWorkspaceMetadata(plan.parentWorkspaceId); + } + + if (!forkResult.success) { + initLogger.logComplete(-1); + throw new Error(`Task fork failed: ${forkResult.error}`); + } + + const { + workspacePath, + trunkBranch, + forkedRuntimeConfig, + targetRuntime: runtimeForTaskWorkspace, + projects: inheritedProjects, + } = forkResult.data; + + this.configureMultiProjectRuntimeEnvResolver(runtimeForTaskWorkspace); + const taskBaseCommitShaByProjectPath = await readTaskBaseCommitShaByProjectPath({ + workspaceId: plan.taskId, + workspaceName: plan.workspaceName, + workspacePath, + runtimeConfig: forkedRuntimeConfig, + projectPath: plan.parentMeta.projectPath, + projectName: plan.parentMeta.projectName, + projects: inheritedProjects, + runtime: runtimeForTaskWorkspace, + }); + const taskBaseCommitSha = taskBaseCommitShaByProjectPath[plan.parentMeta.projectPath]; + + await this.editWorkspaceEntry( + plan.taskId, + (ws) => { + if (ws.taskStatus !== "starting") { + return; + } + ws.path = workspacePath; + ws.runtimeConfig = forkedRuntimeConfig; + ws.taskTrunkBranch = trunkBranch; + ws.taskBaseCommitSha = taskBaseCommitSha ?? undefined; + ws.taskBaseCommitShaByProjectPath = taskBaseCommitShaByProjectPath; + ws.projects = inheritedProjects; + }, + { allowMissing: true } + ); + await this.emitWorkspaceMetadata(plan.taskId); + + const entryBeforeSend = findWorkspaceEntry(this.config.loadConfigOrDefault(), plan.taskId); + if (entryBeforeSend?.workspace.taskStatus !== "starting") { + return; + } + + const secrets = await secretsToRecord( + this.config.getEffectiveSecrets(plan.parentMeta.projectPath), + this.opResolver + ); + runBackgroundInit( + runtimeForTaskWorkspace, + { + projectPath: plan.parentMeta.projectPath, + branchName: plan.workspaceName, + trunkBranch, + workspacePath, + initLogger, + env: secrets, + skipInitHook: plan.skipInitHook, + trusted: + this.config + .loadConfigOrDefault() + .projects.get(stripTrailingSlashes(plan.parentMeta.projectPath))?.trusted ?? false, + }, + plan.taskId + ); + + const sendResult = await this.workspaceService.sendMessage( + plan.taskId, + plan.prompt, + { + model: plan.taskModelString, + agentId: plan.agentId, + thinkingLevel: plan.effectiveThinkingLevel, + experiments: plan.experiments, + }, + { allowQueuedAgentTask: true, agentInitiated: true } + ); + if (!sendResult.success) { + const message = + typeof sendResult.error === "string" + ? sendResult.error + : formatSendMessageError(sendResult.error).message; + await this.rollbackFailedTaskCreate( + runtimeForTaskWorkspace, + plan.parentMeta.projectPath, + plan.workspaceName, + plan.taskId + ); + throw new Error(message); + } + + await this.setTaskStatus(plan.taskId, "running"); + this.scheduleMaybeStartQueuedTasks(); + } + async create(args: TaskCreateArgs): Promise> { const parentWorkspaceId = coerceNonEmptyString(args.parentWorkspaceId); if (!parentWorkspaceId) { @@ -2342,7 +2828,7 @@ export class TaskService { // Report monotonicity: interrupted tasks can still be streaming while stream-end // finalization persists agent_report. Waiters should keep waiting in that window. if (!this.aiService.isStreaming(taskId)) { - throw new Error("Task interrupted"); + throw new Error(taskWorkspaceEntry.workspace.taskLaunchError ?? "Task interrupted"); } } @@ -2387,7 +2873,7 @@ export class TaskService { // Report monotonicity: an interrupted task may still be in stream-end teardown, // so keep the waiter alive while the stream is active. if (!this.aiService.isStreaming(taskId)) { - reject(new Error("Task interrupted")); + reject(new Error(taskWorkspaceEntry.workspace.taskLaunchError ?? "Task interrupted")); return; } } @@ -2476,10 +2962,10 @@ export class TaskService { this.registerBackgroundableForegroundWaiter(requestingWorkspaceId, entry); } - // Don't start the execution timeout while the task is still queued. - // The timer starts once the child actually begins running (queued -> running). + // Don't start the execution timeout while the task is still queued/starting. + // The timer starts once the child actually begins running (queued/starting -> running). const initialStatus = taskWorkspaceEntry.workspace.taskStatus; - if (initialStatus === "queued") { + if (initialStatus === "queued" || initialStatus === "starting") { const startWaiterEntry: PendingTaskStartWaiter = { start: startReportTimeout, cleanup: () => { @@ -2503,7 +2989,10 @@ export class TaskService { // Close the race where the task starts between the initial config read and registering the waiter. const cfgAfterRegister = this.config.loadConfigOrDefault(); const afterEntry = findWorkspaceEntry(cfgAfterRegister, taskId); - if (afterEntry?.workspace.taskStatus !== "queued") { + if ( + afterEntry?.workspace.taskStatus !== "queued" && + afterEntry?.workspace.taskStatus !== "starting" + ) { cleanupStartWaiter(); startReportTimeout(); } @@ -2512,7 +3001,7 @@ export class TaskService { // scheduler runs after the waiter is registered. This avoids deadlocks when // maxParallelAgentTasks is low. if (requestingWorkspaceId) { - void this.maybeStartQueuedTasks(); + this.scheduleMaybeStartQueuedTasks(); } } else { startReportTimeout(); @@ -2649,7 +3138,12 @@ export class TaskService { const cfg = this.config.loadConfigOrDefault(); const index = this.buildAgentTaskIndex(cfg); - const activeStatuses = new Set(["queued", "running", "awaiting_report"]); + const activeStatuses = new Set([ + "queued", + "starting", + "running", + "awaiting_report", + ]); const result: string[] = []; const stack: Array<{ taskId: string; workflowOwned: boolean }> = [ ...(index.childrenByParent.get(workspaceId) ?? []).map((taskId) => ({ @@ -3029,7 +3523,7 @@ export class TaskService { if (status === "running" && task.id && this.isForegroundAwaiting(task.id)) { continue; } - if (status === "running" || status === "awaiting_report") { + if (status === "starting" || status === "running" || status === "awaiting_report") { activeCount += 1; continue; } @@ -3053,7 +3547,12 @@ export class TaskService { const index = this.buildAgentTaskIndex(config); - const activeStatuses = new Set(["queued", "running", "awaiting_report"]); + const activeStatuses = new Set([ + "queued", + "starting", + "running", + "awaiting_report", + ]); const stack: string[] = [...(index.childrenByParent.get(workspaceId) ?? [])]; while (stack.length > 0) { const next = stack.pop()!; @@ -3114,514 +3613,225 @@ export class TaskService { } async maybeStartQueuedTasks(): Promise { - await using _lock = await this.mutex.acquire(); - - const configAtStart = this.config.loadConfigOrDefault(); - const taskSettingsAtStart: TaskSettings = configAtStart.taskSettings ?? DEFAULT_TASK_SETTINGS; + // A foreground task waiter registers itself in waitForAgentReport's async setup. Yield once so + // immediate scheduler calls from the same turn see that foreground-awaiting state and avoid a + // nested-task deadlock at maxParallelAgentTasks=1. + await Promise.resolve(); + const existingRun = this.maybeStartQueuedTasksInFlight; + if (existingRun != null) { + await existingRun; + return; + } - const activeCount = this.countActiveAgentTasks(configAtStart); - const availableSlots = Math.max(0, taskSettingsAtStart.maxParallelAgentTasks - activeCount); - taskQueueDebug("TaskService.maybeStartQueuedTasks summary", { - activeCount, - maxParallelAgentTasks: taskSettingsAtStart.maxParallelAgentTasks, - availableSlots, - }); - if (availableSlots === 0) return; - - const queuedTaskIds = this.listAgentTaskWorkspaces(configAtStart) - .filter((t) => t.taskStatus === "queued" && typeof t.id === "string") - .sort((a, b) => { - const aTime = a.createdAt ? Date.parse(a.createdAt) : 0; - const bTime = b.createdAt ? Date.parse(b.createdAt) : 0; - return aTime - bTime; - }) - .map((t) => t.id!); - - taskQueueDebug("TaskService.maybeStartQueuedTasks candidates", { - queuedCount: queuedTaskIds.length, - queuedIds: queuedTaskIds, + const run = this.maybeStartQueuedTasksFromReservations().finally(() => { + if (this.maybeStartQueuedTasksInFlight === run) { + this.maybeStartQueuedTasksInFlight = undefined; + } }); + this.maybeStartQueuedTasksInFlight = run; + await run; + } + + private async maybeStartQueuedTasksFromReservations(): Promise { + const plans: TaskLaunchPlan[] = []; + + { + await using _lock = await this.mutex.acquire(); - for (const taskId of queuedTaskIds) { const config = this.config.loadConfigOrDefault(); const taskSettings: TaskSettings = config.taskSettings ?? DEFAULT_TASK_SETTINGS; - assert( - Number.isFinite(taskSettings.maxParallelAgentTasks) && - taskSettings.maxParallelAgentTasks > 0, - "TaskService.maybeStartQueuedTasks: maxParallelAgentTasks must be a positive number" + const availableSlots = Math.max( + 0, + taskSettings.maxParallelAgentTasks - this.countActiveAgentTasks(config) ); - - const activeCount = this.countActiveAgentTasks(config); - if (activeCount >= taskSettings.maxParallelAgentTasks) { - break; - } - - const taskEntry = findWorkspaceEntry(config, taskId); - if (!taskEntry?.workspace.parentWorkspaceId) continue; - const task = taskEntry.workspace; - if (task.taskStatus !== "queued") continue; - - // Defensive: tasks can begin streaming before taskStatus flips to "running". - if (this.aiService.isStreaming(taskId)) { - taskQueueDebug("TaskService.maybeStartQueuedTasks queued-but-streaming; marking running", { - taskId, - }); - await this.setTaskStatus(taskId, "running"); - continue; - } - - assert(typeof task.name === "string" && task.name.trim().length > 0, "Task name missing"); - - const parentId = coerceNonEmptyString(task.parentWorkspaceId); - if (!parentId) { - log.error("Queued task missing parentWorkspaceId; cannot start", { taskId }); - continue; - } - - const parentEntry = findWorkspaceEntry(config, parentId); - if (!parentEntry) { - log.error("Queued task parent not found; cannot start", { taskId, parentId }); - continue; - } - - const parentWorkspaceName = coerceNonEmptyString(parentEntry.workspace.name); - if (!parentWorkspaceName) { - log.error("Queued task parent missing workspace name; cannot start", { - taskId, - parentId, - }); - continue; - } - - const taskRuntimeConfig = task.runtimeConfig ?? parentEntry.workspace.runtimeConfig; - if (!taskRuntimeConfig) { - log.error("Queued task missing runtimeConfig; cannot start", { taskId }); - continue; - } - - const parentRuntimeConfig = parentEntry.workspace.runtimeConfig ?? taskRuntimeConfig; - const workspaceName = task.name.trim(); - const runtime = createRuntimeForWorkspace({ - runtimeConfig: taskRuntimeConfig, - projectPath: taskEntry.projectPath, - name: workspaceName, - }); - let runtimeForTaskWorkspace = runtime; - let forkedRuntimeConfig = taskRuntimeConfig; - - let workspacePath = - coerceNonEmptyString(task.path) ?? - runtime.getWorkspacePath(taskEntry.projectPath, workspaceName); - - let workspaceExists = false; - try { - await runtime.stat(workspacePath); - workspaceExists = true; - } catch { - workspaceExists = false; - } - - const inMemoryInit = this.initStateManager.getInitState(taskId); - const persistedInit = inMemoryInit - ? null - : await this.initStateManager.readInitStatus(taskId); - - // Re-check capacity after awaiting IO to avoid dequeuing work (worktree creation/init) when - // another task became active in the meantime. - const latestConfig = this.config.loadConfigOrDefault(); - const latestTaskSettings: TaskSettings = latestConfig.taskSettings ?? DEFAULT_TASK_SETTINGS; - const latestActiveCount = this.countActiveAgentTasks(latestConfig); - if (latestActiveCount >= latestTaskSettings.maxParallelAgentTasks) { - taskQueueDebug("TaskService.maybeStartQueuedTasks became full mid-loop", { - taskId, - activeCount: latestActiveCount, - maxParallelAgentTasks: latestTaskSettings.maxParallelAgentTasks, - }); - break; - } - - // Ensure the workspace exists before starting. Queued tasks should not create worktrees/directories - // until they are actually dequeued. - let trunkBranch = - typeof task.taskTrunkBranch === "string" && task.taskTrunkBranch.trim().length > 0 - ? task.taskTrunkBranch.trim() - : parentWorkspaceName; - if (trunkBranch.length === 0) { - trunkBranch = "main"; - } - - let shouldRunInit = !inMemoryInit && !persistedInit; - let initLogger: InitLogger | null = null; - const getInitLogger = (): InitLogger => { - if (initLogger) return initLogger; - initLogger = this.startWorkspaceInit(taskId, taskEntry.projectPath); - return initLogger; - }; - - taskQueueDebug("TaskService.maybeStartQueuedTasks start attempt", { - taskId, - workspaceName, - parentId, - parentWorkspaceName, - runtimeType: taskRuntimeConfig.type, - workspacePath, - workspaceExists, - trunkBranch, - shouldRunInit, - inMemoryInit: Boolean(inMemoryInit), - persistedInit: Boolean(persistedInit), + taskQueueDebug("TaskService.maybeStartQueuedTasks reservation summary", { + maxParallelAgentTasks: taskSettings.maxParallelAgentTasks, + availableSlots, }); + if (availableSlots === 0) return; + + const queuedTasks = this.listAgentTaskWorkspaces(config) + .filter((task) => task.taskStatus === "queued" && typeof task.id === "string") + .sort((a, b) => { + const aTime = a.createdAt ? Date.parse(a.createdAt) : 0; + const bTime = b.createdAt ? Date.parse(b.createdAt) : 0; + return aTime - bTime; + }) + .slice(0, availableSlots); - // Trust gate: skip dequeued tasks if the project lost trust since queuing. - const dequeueCfg = this.config.loadConfigOrDefault(); - const normalizedTaskProjectPath = stripTrailingSlashes(taskEntry.projectPath); - const dequeueProjectConfig = dequeueCfg.projects.get(normalizedTaskProjectPath); - if (!dequeueProjectConfig?.trusted) { - log.warn("Skipping queued task for untrusted project", { - taskId, - projectPath: taskEntry.projectPath, - }); - taskQueueDebug("TaskService.maybeStartQueuedTasks skipped (untrusted)", { taskId }); - await this.setTaskStatus(taskId, "interrupted"); - this.rejectWaiters(taskId, new Error("Task skipped: project is not trusted")); - continue; - } - - // Multi-project queued tasks persist every constituent project. Re-check those secondary - // refs here so trust revocations terminate the task instead of retrying the same fork forever. - const untrustedSecondaryProject = - Array.isArray(task.projects) && task.projects.length > 1 - ? task.projects.find((project) => { - const normalizedProjectPath = stripTrailingSlashes(project.projectPath); - if (normalizedProjectPath === normalizedTaskProjectPath) { - return false; - } - return !(dequeueCfg.projects.get(normalizedProjectPath)?.trusted ?? false); - }) - : undefined; - if (untrustedSecondaryProject) { - log.warn("Skipping queued multi-project task for untrusted project", { - taskId, - projectPath: untrustedSecondaryProject.projectPath, - projects: task.projects, - }); - taskQueueDebug("TaskService.maybeStartQueuedTasks skipped (secondary untrusted)", { - taskId, - projectPath: untrustedSecondaryProject.projectPath, - }); - await this.setTaskStatus(taskId, "interrupted"); - this.rejectWaiters( - taskId, - new Error(`Task skipped: project ${untrustedSecondaryProject.projectPath} is not trusted`) - ); - continue; - } - - // If the workspace doesn't exist yet, create it now (fork preferred, else createWorkspace). - if (!workspaceExists) { - shouldRunInit = true; - const initLogger = getInitLogger(); - - const parentMetadataResult = await this.aiService.getWorkspaceMetadata(parentId); - let parentMetadataForFork = parentMetadataResult.success - ? parentMetadataResult.data - : undefined; - - if (!parentMetadataForFork && Array.isArray(task.projects) && task.projects.length > 1) { - // Queued tasks persist the parent's project refs at queue time. If the parent metadata lookup - // fails later (for example, parent workspace deleted while queued), synthesize enough metadata - // for orchestrateFork to preserve multi-project behavior instead of falling back to single-project. - const primaryProjectRef = - task.projects.find( - (project) => - stripTrailingSlashes(project.projectPath) === - stripTrailingSlashes(taskEntry.projectPath) - ) ?? task.projects[0]; - const projectName = - coerceNonEmptyString(primaryProjectRef?.projectName) ?? - coerceNonEmptyString(taskEntry.projectPath.split("/").filter(Boolean).at(-1)) ?? - taskEntry.projectPath; - - parentMetadataForFork = { - id: parentId, - name: parentWorkspaceName, - projectPath: taskEntry.projectPath, - projectName, - runtimeConfig: parentRuntimeConfig, - projects: task.projects, - } satisfies WorkspaceMetadata; + for (const task of queuedTasks) { + const taskId = task.id; + assert(taskId != null && taskId.length > 0, "queued task id is required"); + if (this.aiService.isStreaming(taskId)) { + await this.setTaskStatus(taskId, "running"); + continue; } - const forkOrchestratorResult = await orchestrateFork({ - sourceRuntime: runtime, - projectPath: taskEntry.projectPath, - sourceWorkspaceName: parentWorkspaceName, - newWorkspaceName: workspaceName, - initLogger, - config: this.config, - sourceWorkspaceId: parentId, - sourceRuntimeConfig: parentRuntimeConfig, - parentMetadata: parentMetadataForFork, - allowCreateFallback: true, - preferredTrunkBranch: trunkBranch, - trusted: - this.config - .loadConfigOrDefault() - .projects.get(stripTrailingSlashes(taskEntry.projectPath))?.trusted ?? false, - multiProjectExperimentEnabled: this.workspaceService.isExperimentEnabled( - EXPERIMENT_IDS.MULTI_PROJECT_WORKSPACES - ), - }); - - if ( - forkOrchestratorResult.success && - forkOrchestratorResult.data.sourceRuntimeConfigUpdate - ) { - await this.config.updateWorkspaceMetadata(parentId, { - runtimeConfig: forkOrchestratorResult.data.sourceRuntimeConfigUpdate, + const queuedPrompt = coerceNonEmptyString(task.taskPrompt); + if (!queuedPrompt) { + taskQueueDebug("TaskService.maybeStartQueuedTasks skipping legacy queued task", { + taskId, }); - // Ensure UI gets the updated runtimeConfig for the parent workspace. - await this.emitWorkspaceMetadata(parentId); + continue; } - if (!forkOrchestratorResult.success) { - initLogger.logComplete(-1); - log.error("Task fork failed", { taskId, error: forkOrchestratorResult.error }); - taskQueueDebug("TaskService.maybeStartQueuedTasks fork failed", { - taskId, - error: forkOrchestratorResult.error, - }); + const parentWorkspaceId = coerceNonEmptyString(task.parentWorkspaceId); + if (!parentWorkspaceId) { + await this.markTaskLaunchFailed(taskId, "Queued task missing parentWorkspaceId"); continue; } - const { - forkedRuntimeConfig: resolvedForkedRuntimeConfig, - targetRuntime, - workspacePath: resolvedWorkspacePath, - trunkBranch: resolvedTrunkBranch, - forkedFromSource, - projects: inheritedProjects, - } = forkOrchestratorResult.data; - - forkedRuntimeConfig = resolvedForkedRuntimeConfig; - runtimeForTaskWorkspace = targetRuntime; - workspacePath = resolvedWorkspacePath; - trunkBranch = resolvedTrunkBranch; - workspaceExists = true; - - taskQueueDebug("TaskService.maybeStartQueuedTasks workspace created", { - taskId, - workspacePath, - forkSuccess: forkedFromSource, - trunkBranch, - }); + const parentEntry = findWorkspaceEntry(config, parentWorkspaceId); + if (!parentEntry) { + await this.markTaskLaunchFailed(taskId, "Queued task parent not found"); + continue; + } + const parentWorkspaceName = coerceNonEmptyString(parentEntry.workspace.name); + if (!parentWorkspaceName) { + await this.markTaskLaunchFailed(taskId, "Queued task parent missing workspace name"); + continue; + } - // Persist any corrected path/trunkBranch for restart-safe init. - await this.editWorkspaceEntry( - taskId, - (ws) => { - ws.path = workspacePath; - ws.taskTrunkBranch = trunkBranch; - ws.runtimeConfig = forkedRuntimeConfig; - ws.projects = inheritedProjects; - }, - { allowMissing: true } - ); - } + const taskRuntimeConfig = task.runtimeConfig ?? parentEntry.workspace.runtimeConfig; + const parentRuntimeConfig = parentEntry.workspace.runtimeConfig ?? taskRuntimeConfig; + if (!taskRuntimeConfig || !parentRuntimeConfig) { + await this.markTaskLaunchFailed(taskId, "Queued task missing runtimeConfig"); + continue; + } - // If init has not yet run for this workspace, start it now (best-effort, async). - // This is intentionally coupled to task start so queued tasks don't run init hooks - // Capture base commit for git-format-patch generation before the agent starts. - // This must reflect the *actual* workspace HEAD after creation/fork, not the parent's current HEAD - // (queued tasks can start much later). - if ( - !coerceNonEmptyString(task.taskBaseCommitSha) || - Object.keys(task.taskBaseCommitShaByProjectPath ?? {}).length === 0 - ) { - const taskBaseCommitShaByProjectPath = await readTaskBaseCommitShaByProjectPath({ - workspaceId: taskId, - workspaceName: task.name, - workspacePath, - runtimeConfig: forkedRuntimeConfig, - projectPath: taskEntry.projectPath, - projectName: - task.projects?.find((project) => project.projectPath === taskEntry.projectPath) - ?.projectName ?? - taskEntry.projectPath.split("/").filter(Boolean).at(-1) ?? - taskEntry.projectPath, - projects: task.projects, - runtime: runtimeForTaskWorkspace, - }); - const taskBaseCommitSha = taskBaseCommitShaByProjectPath[taskEntry.projectPath]; - if (taskBaseCommitSha || Object.keys(taskBaseCommitShaByProjectPath).length > 0) { - await this.editWorkspaceEntry( + const normalizedTaskProjectPath = stripTrailingSlashes(task.projectPath); + const taskProjectConfig = config.projects.get(normalizedTaskProjectPath); + if (!taskProjectConfig?.trusted) { + await this.markTaskLaunchFailed(taskId, "Task skipped: project is not trusted"); + continue; + } + const untrustedSecondaryProject = + Array.isArray(task.projects) && task.projects.length > 1 + ? task.projects.find((project) => { + const normalizedProjectPath = stripTrailingSlashes(project.projectPath); + if (normalizedProjectPath === normalizedTaskProjectPath) { + return false; + } + return !(config.projects.get(normalizedProjectPath)?.trusted ?? false); + }) + : undefined; + if (untrustedSecondaryProject) { + await this.markTaskLaunchFailed( taskId, - (ws) => { - ws.taskBaseCommitSha = taskBaseCommitSha; - ws.taskBaseCommitShaByProjectPath = taskBaseCommitShaByProjectPath; - }, - { allowMissing: true } + `Task skipped: project ${untrustedSecondaryProject.projectPath} is not trusted` ); + continue; } - } - // (SSH sync, .mux/init scripts, etc.) until they actually begin execution. - if (shouldRunInit) { - const initLogger = getInitLogger(); - taskQueueDebug("TaskService.maybeStartQueuedTasks initWorkspace starting", { - taskId, - workspacePath, - trunkBranch, - }); - // Multi-project forks need per-project secrets for each runtime's init hook. - this.configureMultiProjectRuntimeEnvResolver(runtimeForTaskWorkspace); - const secrets = await secretsToRecord( - this.config.getEffectiveSecrets(taskEntry.projectPath), - this.opResolver - ); + const parentMetaResult = await this.aiService.getWorkspaceMetadata(parentWorkspaceId); + const parentMeta = parentMetaResult.success + ? parentMetaResult.data + : ({ + id: parentWorkspaceId, + name: parentWorkspaceName, + projectPath: parentEntry.projectPath, + projectName: + parentEntry.workspace.projects?.find( + (project) => + stripTrailingSlashes(project.projectPath) === + stripTrailingSlashes(parentEntry.projectPath) + )?.projectName ?? + parentEntry.projectPath.split("/").filter(Boolean).at(-1) ?? + parentEntry.projectPath, + runtimeConfig: parentRuntimeConfig, + projects: parentEntry.workspace.projects, + } satisfies WorkspaceMetadata); + + const agentId = resolveTaskAgentIdForResume(task); + assert(agentId.length > 0, "queued task agentId is required"); let skipInitHook = false; - const agentIdCandidates = resolvePersistedAgentIdCandidates(task); - if (agentIdCandidates.length > 0) { - const discoveryContexts: Array<{ runtime: Runtime; workspacePath: string }> = [ - { runtime: runtimeForTaskWorkspace, workspacePath }, - ]; - try { - discoveryContexts.push( - createRuntimeContextForWorkspace({ - runtimeConfig: parentEntry.workspace.runtimeConfig ?? parentRuntimeConfig, - projectPath: parentEntry.projectPath, - name: parentWorkspaceName, - namedWorkspacePath: coerceNonEmptyString(parentEntry.workspace.path), - }) - ); - } catch (error: unknown) { - log.debug("Queued task: failed to build parent agent-discovery runtime", { - taskId, - parentWorkspaceId: parentId, - error: getErrorMessage(error), - }); - } - - let resolvedSkipInitHook: boolean | undefined; - for (const agentId of agentIdCandidates) { - let fallbackSkipInitHook: boolean | undefined; - for (const discovery of discoveryContexts) { - try { - const definition = await readAgentDefinition( - discovery.runtime, - discovery.workspacePath, - agentId - ); - const frontmatter = await resolveAgentFrontmatter( - discovery.runtime, - discovery.workspacePath, - definition.id, - { - skipScopesAbove: getSkipScopesAboveForKnownScope(definition.scope), - } - ); - const candidateSkipInitHook = frontmatter.subagent?.skip_init_hook === true; - if (definition.scope === "project") { - resolvedSkipInitHook = candidateSkipInitHook; - break; - } - fallbackSkipInitHook ??= candidateSkipInitHook; - } catch (error: unknown) { - log.debug("Queued task: failed to read agent definition for skip_init_hook", { - taskId, - agentId, - agentDiscoveryPath: discovery.workspacePath, - error: getErrorMessage(error), - }); - } - } - - resolvedSkipInitHook ??= fallbackSkipInitHook; - if (resolvedSkipInitHook != null) { - break; - } - } - - skipInitHook = resolvedSkipInitHook ?? false; - } - - runBackgroundInit( - runtimeForTaskWorkspace, - { - projectPath: taskEntry.projectPath, - branchName: workspaceName, - trunkBranch, - workspacePath, - initLogger, - env: secrets, - skipInitHook, - trusted: - this.config - .loadConfigOrDefault() - .projects.get(stripTrailingSlashes(taskEntry.projectPath))?.trusted ?? false, - }, - taskId - ); - } - - const model = task.taskModelString ?? defaultModel; - const queuedPrompt = coerceNonEmptyString(task.taskPrompt); - if (queuedPrompt) { - taskQueueDebug("TaskService.maybeStartQueuedTasks sendMessage starting (dequeue)", { - taskId, - model, - promptLength: queuedPrompt.length, - }); - const sendResult = await this.workspaceService.sendMessage( - taskId, - queuedPrompt, - { - model, - agentId: resolveTaskAgentIdForResume(task), - thinkingLevel: task.taskThinkingLevel, - experiments: task.taskExperiments, - }, - { allowQueuedAgentTask: true, agentInitiated: true } - ); - if (!sendResult.success) { - log.error("Failed to start queued task via sendMessage", { + try { + const parentRuntime = createRuntimeForWorkspace({ + runtimeConfig: parentRuntimeConfig, + projectPath: parentEntry.projectPath, + name: parentWorkspaceName, + }); + const parentWorkspacePath = + coerceNonEmptyString(parentEntry.workspace.path) ?? + parentRuntime.getWorkspacePath(parentEntry.projectPath, parentWorkspaceName); + const frontmatter = await resolveAgentFrontmatter( + parentRuntime, + parentWorkspacePath, + agentId + ); + skipInitHook = frontmatter.subagent?.skip_init_hook === true; + } catch (error: unknown) { + log.debug("Queued task: failed to resolve skip_init_hook during reservation", { taskId, - error: sendResult.error, + agentId, + error: getErrorMessage(error), }); - continue; } - } else { - // Backward compatibility: older queued tasks persisted their prompt in chat history. - taskQueueDebug("TaskService.maybeStartQueuedTasks resumeStream starting (legacy dequeue)", { - taskId, - model, - }); - const resumeResult = await this.workspaceService.resumeStream( - taskId, - { - model, - agentId: resolveTaskAgentIdForResume(task), - thinkingLevel: task.taskThinkingLevel, - experiments: task.taskExperiments, - }, - { allowQueuedAgentTask: true, agentInitiated: true } - ); - if (!resumeResult.success) { - log.error("Failed to start queued task", { taskId, error: resumeResult.error }); - taskQueueDebug("TaskService.maybeStartQueuedTasks resumeStream failed", { - taskId, - error: resumeResult.error, - }); + const workspaceName = coerceNonEmptyString(task.name); + if (!workspaceName) { + await this.markTaskLaunchFailed(taskId, "Queued task missing workspace name"); continue; } + + const canonicalModel = + coerceNonEmptyString(task.aiSettings?.model) ?? + normalizeToCanonical(task.taskModelString ?? defaultModel); + const createdAt = task.createdAt ?? getIsoNow(); + await this.editWorkspaceEntry(taskId, (workspace) => { + workspace.taskStatus = "starting"; + }); + + plans.push({ + taskId, + parentWorkspaceId, + parentMeta, + agentId, + agentType: task.agentType ?? agentId, + prompt: queuedPrompt, + title: task.title ?? workspaceName, + workspaceName, + createdAt, + taskRuntimeConfig, + parentRuntimeConfig, + taskModelString: task.taskModelString ?? defaultModel, + canonicalModel, + effectiveThinkingLevel: task.taskThinkingLevel, + skipInitHook, + preferredTrunkBranch: task.taskTrunkBranch, + workflowTask: task.workflowTask, + bestOf: task.bestOf, + experiments: task.taskExperiments, + }); } + } - await this.setTaskStatus(taskId, "running"); - taskQueueDebug("TaskService.maybeStartQueuedTasks started", { taskId }); + for (const plan of plans) { + await this.enqueueReservedTaskLaunch(plan); } } + private async enqueueReservedTaskLaunch(plan: TaskLaunchPlan): Promise { + assert(plan.taskId.length > 0, "enqueueReservedTaskLaunch requires taskId"); + const projectPath = stripTrailingSlashes(plan.parentMeta.projectPath); + assert(projectPath.length > 0, "enqueueReservedTaskLaunch requires projectPath"); + + const previousLaunch = + this.reservedTaskLaunchByProjectPath.get(projectPath) ?? Promise.resolve(); + const launch = previousLaunch + .catch(() => undefined) + .then(async () => { + await this.startReservedAgentTask(plan); + }); + const trackedLaunch = launch.finally(() => { + if (this.reservedTaskLaunchByProjectPath.get(projectPath) === trackedLaunch) { + this.reservedTaskLaunchByProjectPath.delete(projectPath); + } + }); + this.reservedTaskLaunchByProjectPath.set(projectPath, trackedLaunch); + await launch; + } + private async setTaskStatus(workspaceId: string, status: AgentTaskStatus): Promise { assert(workspaceId.length > 0, "setTaskStatus: workspaceId must be non-empty"); @@ -5331,6 +5541,7 @@ export class TaskService { const hasRecoverableSibling = siblings.some((sibling) => { return ( sibling.taskStatus === "queued" || + sibling.taskStatus === "starting" || sibling.taskStatus === "running" || sibling.taskStatus === "awaiting_report" ); diff --git a/src/node/services/tools/task.ts b/src/node/services/tools/task.ts index 4676ac6b31..5b786c9a0c 100644 --- a/src/node/services/tools/task.ts +++ b/src/node/services/tools/task.ts @@ -106,14 +106,14 @@ function parseTaskAiOverrides(args: { model?: string | null; thinking?: string | interface SpawnedTaskInfo { taskId: string; - status: "queued" | "running"; + status: "queued" | "starting" | "running"; groupKind?: TaskGroupKind; label?: string; } interface PendingTaskInfo { taskId: string; - status: "queued" | "running" | "completed" | "interrupted"; + status: "queued" | "starting" | "running" | "completed" | "interrupted"; groupKind?: TaskGroupKind; label?: string; } @@ -162,8 +162,10 @@ function emitTaskCreatedEvent(params: { function toAggregatePendingStatus( statuses: ReadonlyArray -): "queued" | "running" { - return statuses.every((status) => status === "queued") ? "queued" : "running"; +): "queued" | "starting" | "running" { + if (statuses.every((status) => status === "queued")) return "queued"; + if (statuses.every((status) => status === "starting")) return "starting"; + return "running"; } function serializeCompletedReport(report: CompletedTaskInfo) { diff --git a/src/node/services/tools/task_await.ts b/src/node/services/tools/task_await.ts index 3406d9f256..41a575f813 100644 --- a/src/node/services/tools/task_await.ts +++ b/src/node/services/tools/task_await.ts @@ -30,10 +30,15 @@ const WORKFLOW_AWAIT_POLL_INTERVAL_MS = 250; // awaiting a report. Centralised here so the timeout=0 and "timed out" error // branches below stay in lockstep when shared fields are added — see #3234, // which extended both branches symmetrically with `getAgentTaskElapsedField`. -type AgentTaskActiveStatus = "queued" | "running" | "awaiting_report"; +type AgentTaskActiveStatus = "queued" | "starting" | "running" | "awaiting_report"; function isAgentTaskActiveStatus(status: AgentTaskStatus | null): status is AgentTaskActiveStatus { - return status === "queued" || status === "running" || status === "awaiting_report"; + return ( + status === "queued" || + status === "starting" || + status === "running" || + status === "awaiting_report" + ); } function coerceTimeoutMs(timeoutSecs: unknown): number | undefined { diff --git a/src/node/services/tools/task_list.test.ts b/src/node/services/tools/task_list.test.ts index 366552f1a4..c2b3261e82 100644 --- a/src/node/services/tools/task_list.test.ts +++ b/src/node/services/tools/task_list.test.ts @@ -24,7 +24,7 @@ describe("task_list tool", () => { expect(result).toEqual({ tasks: [] }); expect(listDescendantAgentTasks).toHaveBeenCalledWith("root-workspace", { - statuses: ["queued", "running", "awaiting_report"], + statuses: ["queued", "starting", "running", "awaiting_report"], excludeWorkflowTasks: true, }); }); diff --git a/src/node/services/tools/task_list.ts b/src/node/services/tools/task_list.ts index cdec899208..c2df4b148f 100644 --- a/src/node/services/tools/task_list.ts +++ b/src/node/services/tools/task_list.ts @@ -6,7 +6,7 @@ import { TaskListToolResultSchema, TOOL_DEFINITIONS } from "@/common/utils/tools import { toBashTaskId } from "./taskId"; import { parseToolResult, requireTaskService, requireWorkspaceId } from "./toolUtils"; -const DEFAULT_STATUSES = ["queued", "running", "awaiting_report"] as const; +const DEFAULT_STATUSES = ["queued", "starting", "running", "awaiting_report"] as const; export const createTaskListTool: ToolFactory = (config: ToolConfiguration) => { return tool({ diff --git a/src/node/services/workflows/WorkflowRunner.test.ts b/src/node/services/workflows/WorkflowRunner.test.ts index de965e5d85..0abd59ce58 100644 --- a/src/node/services/workflows/WorkflowRunner.test.ts +++ b/src/node/services/workflows/WorkflowRunner.test.ts @@ -3,7 +3,7 @@ import { execFile } from "node:child_process"; import * as fs from "node:fs/promises"; import * as path from "node:path"; import { promisify } from "node:util"; -import { describe, expect, spyOn, test } from "bun:test"; +import { describe, expect, mock, spyOn, test } from "bun:test"; import { QuickJSRuntimeFactory } from "@/node/services/ptc/quickjsRuntime"; import { ForegroundWaitBackgroundedError } from "@/node/services/taskService"; import { DisposableTempDir } from "@/node/services/tempDir"; @@ -998,6 +998,57 @@ describe("WorkflowRunner", () => { expect(run.steps.map((step) => step.stepId).sort()).toEqual(["source-a", "source-b"]); }); + test("bulk creates new parallelAgents tasks when adapter supports createAgentTasks", async () => { + using tmp = new DisposableTempDir("workflow-runner-parallel-bulk"); + const store = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: WORKFLOW_RUNNER_TEST_STALE_LEASE_MS, + }); + await store.createRun({ + id: "wfr_parallel_bulk", + workspaceId: "workspace-1", + definition, + definitionSource: `export default function workflow({ parallelAgents }) { + const results = parallelAgents([ + { id: "source-a", prompt: "Read source A" }, + { id: "source-b", prompt: "Read source B" }, + ]); + return { reportMarkdown: results.map((result) => result.reportMarkdown).join(" + ") }; + }`, + args: {}, + now: "2026-05-29T00:00:00.000Z", + }); + const createAgentTasks = mock( + async ( + specs: Array<{ id: string }>, + lifecycle?: { onTaskCreated?: (index: number, taskId: string) => Promise | void } + ) => { + for (const [index, spec] of specs.entries()) { + await lifecycle?.onTaskCreated?.(index, `task_${spec.id}`); + } + return specs.map((spec) => ({ taskId: `task_${spec.id}`, status: "starting" as const })); + } + ); + const runAgent = mock(async () => { + throw new Error("parallelAgents should use bulk creation"); + }); + const waitForAgentTask = mock(async (taskId: string) => ({ + taskId, + reportMarkdown: taskId.replace("task_", ""), + })); + const runner = createRunner(store, { runAgent, createAgentTasks, waitForAgentTask }); + + await expect(runner.run("wfr_parallel_bulk")).resolves.toEqual({ + reportMarkdown: "source-a + source-b", + }); + + expect(createAgentTasks).toHaveBeenCalledTimes(1); + expect(runAgent).not.toHaveBeenCalled(); + expect(waitForAgentTask).toHaveBeenCalledTimes(2); + const run = await store.getRun("wfr_parallel_bulk"); + expect(run.steps.map((step) => step.taskId).sort()).toEqual(["task_source-a", "task_source-b"]); + }); + test("records completed parallelAgents results before slower siblings finish", async () => { using tmp = new DisposableTempDir("workflow-runner-parallel-incremental"); const store = new WorkflowRunStore({ diff --git a/src/node/services/workflows/WorkflowRunner.ts b/src/node/services/workflows/WorkflowRunner.ts index cbc0433d76..c1a460ebe0 100644 --- a/src/node/services/workflows/WorkflowRunner.ts +++ b/src/node/services/workflows/WorkflowRunner.ts @@ -107,6 +107,10 @@ export interface WorkflowTaskAdapter { lifecycle?: { onTaskCreated?: (taskId: string) => Promise | void }, waitOptions?: WorkflowAgentWaitOptions ): Promise; + createAgentTasks?( + specs: WorkflowAgentSpec[], + lifecycle?: { onTaskCreated?: (index: number, taskId: string) => Promise | void } + ): Promise>; waitForAgentTask?( taskId: string, spec: WorkflowAgentSpec, @@ -1289,17 +1293,69 @@ export class WorkflowRunner { ...options.waitOptions, abortSignal: batchAbortController.signal, }; - const guardedRuns = currentPending.map(async (step, pendingIndex) => { + const effectivePending = currentPending.map((step) => ({ + ...step, + runSpec: + step.attempt === 1 + ? step.spec + : buildRetryAgentSpec( + step.spec, + step.attempt - 1, + step.retryMessage ?? "previous attempt failed" + ), + })); + const bulkCreatableSteps = effectivePending.filter((step) => step.taskId == null); + if ( + bulkCreatableSteps.length > 0 && + this.taskAdapter.createAgentTasks != null && + this.taskAdapter.waitForAgentTask != null + ) { + try { + const createdTasks = await this.taskAdapter.createAgentTasks( + bulkCreatableSteps.map((step) => step.runSpec), + { + onTaskCreated: async (index, taskId) => { + const step = bulkCreatableSteps[index]; + assert(step != null, "WorkflowRunner.parallelAgents bulk lifecycle index mismatch"); + assert(taskId.length > 0, "WorkflowRunner.parallelAgents bulk taskId is required"); + options.leaseGuard.throwIfLost(); + await this.recordStepStarted(runId, { + stepId: step.spec.id, + inputHash: step.inputHash, + taskId, + startedAt: step.startedAt, + }); + await this.recordTaskStartedEventIfMissing(runId, sequence, { + stepId: step.spec.id, + taskId, + }); + }, + } + ); + if (createdTasks.length !== bulkCreatableSteps.length) { + throw new Error("parallelAgents bulk task creation returned the wrong number of tasks"); + } + for (const [index, createdTask] of createdTasks.entries()) { + assert( + createdTask.taskId.length > 0, + "WorkflowRunner.parallelAgents created taskId is required" + ); + bulkCreatableSteps[index].taskId = createdTask.taskId; + } + } catch (error) { + if (isForegroundWaitBackgroundedError(error)) { + foregroundBackgrounded = true; + abortBatch(); + } else if (!foregroundBackgrounded) { + await interruptRemainingTasks(); + } + throw error; + } + } + const guardedRuns = effectivePending.map(async (step, pendingIndex) => { try { const rawResult = await this.runOrResumeAgentStep(runId, sequence, { - spec: - step.attempt === 1 - ? step.spec - : buildRetryAgentSpec( - step.spec, - step.attempt - 1, - step.retryMessage ?? "previous attempt failed" - ), + spec: step.runSpec, inputHash: step.inputHash, startedAt: step.startedAt, taskId: step.taskId, diff --git a/src/node/services/workflows/WorkflowTaskServiceAdapter.test.ts b/src/node/services/workflows/WorkflowTaskServiceAdapter.test.ts index 4931a3204a..24f3c12e79 100644 --- a/src/node/services/workflows/WorkflowTaskServiceAdapter.test.ts +++ b/src/node/services/workflows/WorkflowTaskServiceAdapter.test.ts @@ -127,6 +127,68 @@ describe("WorkflowTaskServiceAdapter", () => { }); }); + test("bulk creates workflow child tasks with workflow metadata", async () => { + const createMany = mock(async (_args: unknown[]) => + Ok([ + { taskId: "task_1", kind: "agent" as const, status: "starting" as const }, + { taskId: "task_2", kind: "agent" as const, status: "queued" as const }, + ]) + ); + const create = mock(async () => + Ok({ taskId: "unused", kind: "agent" as const, status: "running" as const }) + ); + const waitForAgentReport = mock(async () => ({ reportMarkdown: "unused" })); + const adapter = new WorkflowTaskServiceAdapter({ + taskService: { create, createMany, waitForAgentReport }, + parentWorkspaceId: "parent_1", + workflowRunId: "wfr_123", + defaultAgentId: "explore", + experiments: { dynamicWorkflows: true }, + }); + + const created: Array<[number, string]> = []; + const result = await adapter.createAgentTasks( + [ + { id: "first", prompt: "Do first", title: "First" }, + { id: "second", prompt: "Do second", agentId: "exec", outputSchema: { type: "object" } }, + ], + { + onTaskCreated: (index, taskId) => { + created.push([index, taskId]); + }, + } + ); + + expect(result).toEqual([ + { taskId: "task_1", status: "starting" }, + { taskId: "task_2", status: "queued" }, + ]); + expect(created).toEqual([ + [0, "task_1"], + [1, "task_2"], + ]); + expect(createMany).toHaveBeenCalledWith([ + { + parentWorkspaceId: "parent_1", + kind: "agent", + agentId: "explore", + prompt: "Do first", + title: "First", + workflowTask: { runId: "wfr_123", stepId: "first" }, + experiments: { dynamicWorkflows: true }, + }, + { + parentWorkspaceId: "parent_1", + kind: "agent", + agentId: "exec", + prompt: "Do second", + title: "second", + workflowTask: { runId: "wfr_123", stepId: "second", outputSchema: { type: "object" } }, + experiments: { dynamicWorkflows: true }, + }, + ]); + }); + test("passes workflow wait options into report waits", async () => { const abortController = new AbortController(); const create = mock(async () => diff --git a/src/node/services/workflows/WorkflowTaskServiceAdapter.ts b/src/node/services/workflows/WorkflowTaskServiceAdapter.ts index 4083472d67..c74a0a3692 100644 --- a/src/node/services/workflows/WorkflowTaskServiceAdapter.ts +++ b/src/node/services/workflows/WorkflowTaskServiceAdapter.ts @@ -41,6 +41,23 @@ interface WorkflowTaskServiceLike { modelString?: string; thinkingLevel?: ParsedThinkingInput; }): Promise<{ success: true; data: TaskCreateResult } | { success: false; error: string }>; + createMany?( + args: Array<{ + parentWorkspaceId: string; + kind: "agent"; + agentId: string; + prompt: string; + title: string; + workflowTask: { + runId: string; + stepId: string; + outputSchema?: unknown; + }; + experiments?: WorkflowTaskExperiments; + modelString?: string; + thinkingLevel?: ParsedThinkingInput; + }> + ): Promise<{ success: true; data: TaskCreateResult[] } | { success: false; error: string }>; waitForAgentReport( taskId: string, options: WorkflowAgentWaitOptions & { @@ -180,6 +197,73 @@ export class WorkflowTaskServiceAdapter implements WorkflowTaskAdapter { }); } + async createAgentTasks( + specs: WorkflowAgentSpec[], + lifecycle?: { onTaskCreated?: (index: number, taskId: string) => Promise | void } + ): Promise> { + assert(specs.length > 0, "WorkflowTaskServiceAdapter.createAgentTasks: specs are required"); + if (this.taskService.createMany == null) { + const created: Array<{ taskId: string; status: "queued" | "starting" | "running" }> = []; + for (const [index, spec] of specs.entries()) { + const createResult = await this.taskService.create(this.buildCreateArgs(spec)); + if (!createResult.success) { + throw new Error(createResult.error); + } + assert(createResult.data.taskId.length > 0, "createAgentTasks: taskId is required"); + await lifecycle?.onTaskCreated?.(index, createResult.data.taskId); + created.push({ taskId: createResult.data.taskId, status: createResult.data.status }); + } + return created; + } + + const createResult = await this.taskService.createMany( + specs.map((spec) => this.buildCreateArgs(spec)) + ); + if (!createResult.success) { + throw new Error(createResult.error); + } + if (createResult.data.length !== specs.length) { + throw new Error("WorkflowTaskServiceAdapter.createAgentTasks: result length mismatch"); + } + + const created: Array<{ taskId: string; status: "queued" | "starting" | "running" }> = []; + for (const [index, result] of createResult.data.entries()) { + assert(result.taskId.length > 0, "createAgentTasks: taskId is required"); + await lifecycle?.onTaskCreated?.(index, result.taskId); + created.push({ taskId: result.taskId, status: result.status }); + } + return created; + } + + private buildCreateArgs( + spec: WorkflowAgentSpec + ): Parameters[0] { + assert(spec.id.length > 0, "WorkflowTaskServiceAdapter: spec.id is required"); + assert(spec.prompt.length > 0, "WorkflowTaskServiceAdapter: spec.prompt is required"); + + const workflowTask: { runId: string; stepId: string; outputSchema?: unknown } = { + runId: this.workflowRunId, + stepId: spec.id, + }; + if (spec.outputSchema !== undefined) { + workflowTask.outputSchema = spec.outputSchema; + } + + const agentId = spec.agentId ?? this.defaultAgentId; + const experiments = this.getExperimentsForAgent(agentId); + return { + parentWorkspaceId: this.parentWorkspaceId, + kind: "agent", + agentId, + prompt: spec.prompt, + title: spec.title ?? spec.id, + workflowTask, + ...(experiments !== undefined ? { experiments } : {}), + ...(this.modelString !== undefined ? { modelString: this.modelString } : {}), + ...(this.thinkingLevel !== undefined ? { thinkingLevel: this.thinkingLevel } : {}), + }; + } + async runAgent( spec: WorkflowAgentSpec, lifecycle?: { onTaskCreated?: (taskId: string) => Promise | void }, diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index 8c8176a4c2..24220cf6c1 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -6172,14 +6172,17 @@ export class WorkspaceService extends EventEmitter { for (const [_projectPath, project] of config.projects) { const ws = project.workspaces.find((w) => w.id === workspaceId); if (!ws) continue; - if (ws.parentWorkspaceId && ws.taskStatus === "queued") { - taskQueueDebug("WorkspaceService.sendMessage blocked (queued task)", { + if ( + ws.parentWorkspaceId && + (ws.taskStatus === "queued" || ws.taskStatus === "starting") + ) { + taskQueueDebug("WorkspaceService.sendMessage blocked (queued/starting task)", { workspaceId, stack: new Error("sendMessage blocked").stack, }); return Err({ type: "unknown", - raw: "This agent task is queued and cannot start yet. Wait for a slot to free.", + raw: "This agent task is queued or starting and cannot accept generic messages yet.", }); } break; @@ -6457,14 +6460,17 @@ export class WorkspaceService extends EventEmitter { for (const [_projectPath, project] of config.projects) { const ws = project.workspaces.find((w) => w.id === workspaceId); if (!ws) continue; - if (ws.parentWorkspaceId && ws.taskStatus === "queued") { - taskQueueDebug("WorkspaceService.resumeStream blocked (queued task)", { + if ( + ws.parentWorkspaceId && + (ws.taskStatus === "queued" || ws.taskStatus === "starting") + ) { + taskQueueDebug("WorkspaceService.resumeStream blocked (queued/starting task)", { workspaceId, stack: new Error("resumeStream blocked").stack, }); return Err({ type: "unknown", - raw: "This agent task is queued and cannot start yet. Wait for a slot to free.", + raw: "This agent task is queued or starting and cannot resume through generic calls yet.", }); } break; From f786373ca5271dfc14462ded40b6e344d560afca Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Mon, 8 Jun 2026 16:49:01 +0000 Subject: [PATCH 2/5] =?UTF-8?q?=F0=9F=A4=96=20fix:=20harden=20bulk=20task?= =?UTF-8?q?=20launch=20lifecycle?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AgentListItem/AgentListItem.tsx | 9 +- src/browser/components/ChatPane/ChatPane.tsx | 35 +- .../ProjectSidebar/ProjectSidebar.tsx | 5 +- src/browser/utils/ui/workspaceFiltering.ts | 22 +- src/node/config.ts | 3 +- src/node/services/taskService.test.ts | 43 +++ src/node/services/taskService.ts | 344 ++++++++++++++---- src/node/services/tools/task.ts | 8 +- .../WorkflowTaskServiceAdapter.test.ts | 30 +- .../workflows/WorkflowTaskServiceAdapter.ts | 16 +- 10 files changed, 419 insertions(+), 96 deletions(-) diff --git a/src/browser/components/AgentListItem/AgentListItem.tsx b/src/browser/components/AgentListItem/AgentListItem.tsx index 187e7771f5..a36cd5c9a2 100644 --- a/src/browser/components/AgentListItem/AgentListItem.tsx +++ b/src/browser/components/AgentListItem/AgentListItem.tsx @@ -11,9 +11,10 @@ import { useWorkspaceUnread } from "@/browser/hooks/useWorkspaceUnread"; import { useRuntimeStatus } from "@/browser/stores/RuntimeStatusStore"; import { useWorkspaceSidebarState } from "@/browser/stores/WorkspaceStore"; import { stopKeyboardPropagation } from "@/browser/utils/events"; -import type { - AgentRowRenderMeta, - WorkspaceDelegatedActivity, +import { + isRunningOrStartingTaskStatus, + type AgentRowRenderMeta, + type WorkspaceDelegatedActivity, } from "@/browser/utils/ui/workspaceFiltering"; import { cn } from "@/common/lib/utils"; import { @@ -1192,7 +1193,7 @@ function AgentListItemInner(props: UnifiedAgentListItemProps) { if (rowMeta?.rowKind === "subagent") { // Connector geometry is driven by render metadata so visible siblings keep // consistent single/middle/last shapes as parents expand/collapse children. - const isElbowActive = props.metadata.taskStatus === "running"; + const isElbowActive = isRunningOrStartingTaskStatus(props.metadata.taskStatus); const connectorLayout = props.subAgentConnectorLayout ?? "default"; const connectorDepth = props.depth ?? rowMeta.depth; const connectorRailX = getSubAgentParentRailX(connectorDepth, connectorLayout); diff --git a/src/browser/components/ChatPane/ChatPane.tsx b/src/browser/components/ChatPane/ChatPane.tsx index 6e16a70c3c..0fe05f7f6e 100644 --- a/src/browser/components/ChatPane/ChatPane.tsx +++ b/src/browser/components/ChatPane/ChatPane.tsx @@ -106,6 +106,7 @@ import { computeOperationalBundleInfos, computeWorkBundleInfos, } from "@/browser/utils/messages/transcriptRenderProjection"; +import { isBlockedPreStreamTaskStatus } from "@/browser/utils/ui/workspaceFiltering"; import { recordSyntheticReactRenderSample } from "@/browser/utils/perf/reactProfileCollector"; // Perf e2e runs load the production bundle where React's onRender profiler callbacks may not @@ -321,9 +322,13 @@ const ChatPaneContent: React.FC = (props) => { // so the transcript stays readable while new sends remain disabled. const meta = workspaceMetadata.get(workspaceId); const transcriptOnly = meta?.transcriptOnly ?? false; - const isQueuedAgentTask = Boolean(meta?.parentWorkspaceId) && meta?.taskStatus === "queued"; + const isPreStreamAgentTask = + Boolean(meta?.parentWorkspaceId) && isBlockedPreStreamTaskStatus(meta?.taskStatus); + const preStreamAgentTaskLabel = meta?.taskStatus === "starting" ? "Starting" : "Queued"; const queuedAgentTaskPrompt = - isQueuedAgentTask && typeof meta?.taskPrompt === "string" && meta.taskPrompt.trim().length > 0 + isPreStreamAgentTask && + typeof meta?.taskPrompt === "string" && + meta.taskPrompt.trim().length > 0 ? meta.taskPrompt : null; const shouldShowQueuedAgentTaskPrompt = @@ -1048,7 +1053,9 @@ const ChatPaneContent: React.FC = (props) => { node: (
-
Queued
+
+ {preStreamAgentTaskLabel} +
= (props) => { isTranscriptCaughtUp={isTranscriptCaughtUp} isHydratingTranscript={isHydratingTranscript} runtimeConfig={runtimeConfig} - isQueuedAgentTask={isQueuedAgentTask} + isPreStreamAgentTask={isPreStreamAgentTask} + preStreamAgentTaskStatus={meta?.taskStatus === "starting" ? "starting" : "queued"} isCompacting={isCompacting} shouldShowPinnedTodoList={shouldShowPinnedTodoList} shouldShowReviewsBanner={shouldShowReviewsBanner} @@ -1580,7 +1588,8 @@ interface ChatInputPaneProps { projectName: string; workspaceName: string; runtimeConfig?: RuntimeConfig; - isQueuedAgentTask: boolean; + isPreStreamAgentTask: boolean; + preStreamAgentTaskStatus: "queued" | "starting"; isCompacting: boolean; isStreamStarting: boolean; isTranscriptCaughtUp: boolean; @@ -1695,12 +1704,14 @@ const ChatInputPane: React.FC = (props) => { ), }); } - if (props.isQueuedAgentTask) { + if (props.isPreStreamAgentTask) { addDecorationEntry({ - key: "queued-agent-task", + key: "pre-stream-agent-task", node: (
- This agent task is queued and will start automatically when a parallel slot is available. + {props.preStreamAgentTaskStatus === "starting" + ? "This agent task is starting and will become editable after launch accepts the initial prompt." + : "This agent task is queued and will start automatically when a parallel slot is available."}
), }); @@ -1726,10 +1737,12 @@ const ChatInputPane: React.FC = (props) => { onResetContext={props.onResetContext} onTruncateHistory={props.onTruncateHistory} onModelChange={props.onModelChange} - disabled={!props.projectName || !props.workspaceName || props.isQueuedAgentTask} + disabled={!props.projectName || !props.workspaceName || props.isPreStreamAgentTask} disabledReason={ - props.isQueuedAgentTask - ? "Queued - waiting for an available parallel task slot. This will start automatically." + props.isPreStreamAgentTask + ? props.preStreamAgentTaskStatus === "starting" + ? "Starting - waiting for launch to accept the initial prompt." + : "Queued - waiting for an available parallel task slot. This will start automatically." : undefined } isTranscriptCaughtUp={props.isTranscriptCaughtUp} diff --git a/src/browser/components/ProjectSidebar/ProjectSidebar.tsx b/src/browser/components/ProjectSidebar/ProjectSidebar.tsx index 97abcb3eda..a27731a59f 100644 --- a/src/browser/components/ProjectSidebar/ProjectSidebar.tsx +++ b/src/browser/components/ProjectSidebar/ProjectSidebar.tsx @@ -60,6 +60,7 @@ import { getSectionExpandedKey, getSectionTierKey, resolveEffectiveSectionId, + isRunningOrStartingTaskStatus, type AgentRowRenderMeta, } from "@/browser/utils/ui/workspaceFiltering"; import { Tooltip, TooltipTrigger, TooltipContent } from "../Tooltip/Tooltip"; @@ -2587,7 +2588,9 @@ const ProjectSidebarInner: React.FC = ({ let lastRunningSiblingIndex = -1; for (let index = siblings.length - 1; index >= 0; index -= 1) { - if (siblings[index]?.taskStatus === "running") { + if ( + isRunningOrStartingTaskStatus(siblings[index]?.taskStatus) + ) { lastRunningSiblingIndex = index; break; } diff --git a/src/browser/utils/ui/workspaceFiltering.ts b/src/browser/utils/ui/workspaceFiltering.ts index d57f4d81e5..7c6fe8ae01 100644 --- a/src/browser/utils/ui/workspaceFiltering.ts +++ b/src/browser/utils/ui/workspaceFiltering.ts @@ -148,8 +148,22 @@ function hasDelegatedActivity(activity: WorkspaceDelegatedActivity): boolean { return activity.activeCount > 0 || activity.queuedCount > 0; } -function isActiveDelegatedStatus(status: FrontendWorkspaceMetadata["taskStatus"]): boolean { - return status === "running" || status === "awaiting_report"; +export function isActiveOrStartingTaskStatus( + status: FrontendWorkspaceMetadata["taskStatus"] +): boolean { + return status === "starting" || status === "running" || status === "awaiting_report"; +} + +export function isRunningOrStartingTaskStatus( + status: FrontendWorkspaceMetadata["taskStatus"] +): boolean { + return status === "starting" || status === "running"; +} + +export function isBlockedPreStreamTaskStatus( + status: FrontendWorkspaceMetadata["taskStatus"] +): boolean { + return status === "queued" || status === "starting"; } function getIsWorkspaceLiveActive(workspaceId: string, options: DelegatedActivityOptions): boolean { @@ -166,7 +180,7 @@ export function isWorkspaceDelegatedActivityActive( workspace: FrontendWorkspaceMetadata, options: DelegatedActivityOptions = {} ): boolean { - if (isActiveDelegatedStatus(workspace.taskStatus)) { + if (isActiveOrStartingTaskStatus(workspace.taskStatus)) { return true; } if (hasCompletedAgentReport(workspace)) { @@ -410,7 +424,7 @@ export function computeAgentRowRenderMeta( let lastRunningSiblingIndex = -1; for (let index = siblings.length - 1; index >= 0; index -= 1) { - if (siblings[index]?.taskStatus === "running") { + if (isRunningOrStartingTaskStatus(siblings[index]?.taskStatus)) { lastRunningSiblingIndex = index; break; } diff --git a/src/node/config.ts b/src/node/config.ts index f6f6825b0b..721d519778 100644 --- a/src/node/config.ts +++ b/src/node/config.ts @@ -1382,7 +1382,7 @@ export class Config { } // Mark worktree workspaces with missing checkout directories as transcript-only. - // Queued agent tasks can briefly exist without a provisioned checkout, so keep + // Queued/starting agent tasks can briefly exist without a provisioned checkout, so keep // those workspaces interactive until the checkout is created. const workspacePathExists = await fs.promises .access(workspacePath) @@ -1391,6 +1391,7 @@ export class Config { if ( isWorktreeRuntime(metadata.runtimeConfig) && metadata.taskStatus !== "queued" && + metadata.taskStatus !== "starting" && !workspacePathExists ) { result.transcriptOnly = true; diff --git a/src/node/services/taskService.test.ts b/src/node/services/taskService.test.ts index efc078e021..e6f5aab99d 100644 --- a/src/node/services/taskService.test.ts +++ b/src/node/services/taskService.test.ts @@ -547,6 +547,49 @@ describe("TaskService", () => { expect(sendMessage).not.toHaveBeenCalled(); }); + test("createMany launch failure preserves returned task metadata and launch error", async () => { + const config = await createTestConfig(rootDir); + stubStableIds(config, ["aaaaaaaaaa"], "bbbbbbbbbb"); + + const { parentId } = await saveLocalParentWorkspace(config, rootDir); + const sendMessage = mock((): Promise> => Promise.resolve(Err("Forbidden"))); + const { workspaceService } = createWorkspaceServiceMocks({ sendMessage }); + const { taskService } = createTaskServiceHarness(config, { workspaceService }); + + const result = await taskService.createMany([ + { + parentWorkspaceId: parentId, + kind: "agent", + agentId: "explore", + prompt: "launch should fail", + title: "Failing task", + }, + ]); + + expect(result.success).toBe(true); + if (!result.success) return; + const taskId = result.data[0]?.taskId; + assert(typeof taskId === "string" && taskId.length > 0, "created task id is required"); + + let launchError: unknown; + try { + await taskService.waitForAgentReport(taskId, { + timeoutMs: 10_000, + requestingWorkspaceId: parentId, + }); + } catch (error: unknown) { + launchError = error; + } + assert(launchError instanceof Error, "waitForAgentReport should reject with launch error"); + expect(launchError.message).toContain("Forbidden"); + + const taskEntry = Array.from(config.loadConfigOrDefault().projects.values()) + .flatMap((project) => project.workspaces) + .find((workspace) => workspace.id === taskId); + expect(taskEntry?.taskStatus).toBe("interrupted"); + expect(taskEntry?.taskLaunchError).toBe("Forbidden"); + }); + test("queues tasks when maxParallelAgentTasks is reached and starts them when a slot frees", async () => { const config = await createTestConfig(rootDir); stubStableIds(config, ["aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc", "dddddddddd"], "eeeeeeeeee"); diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index 1b12d7a34c..9f18be23b9 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -300,6 +300,19 @@ interface TaskLaunchPlan { experiments?: TaskCreateArgs["experiments"]; } +interface TaskCreateManyOptions { + onTaskReserved?: (index: number, result: TaskCreateResult) => Promise | void; +} + +interface MaterializedTaskLaunch { + workspacePath: string; + trunkBranch: string; + forkedRuntimeConfig: RuntimeConfig; + runtimeForTaskWorkspace: Runtime; + inheritedProjects: WorkspaceMetadata["projects"]; + sourceRuntimeConfigUpdate?: RuntimeConfig; +} + export interface TerminateAgentTaskResult { /** Task IDs terminated (includes descendants). */ terminatedTaskIds: string[]; @@ -669,6 +682,16 @@ function getIsoNow(): string { return new Date().toISOString(); } +async function runtimePathExists(runtime: Runtime, path: string): Promise { + assert(path.length > 0, "runtimePathExists: path must be non-empty"); + try { + await runtime.stat(path); + return true; + } catch { + return false; + } +} + async function readTaskBaseCommitShaByProjectPath(params: { workspaceId: string; workspaceName: string; @@ -717,6 +740,9 @@ export class TaskService { private readonly deferredBestOfLocks = new MutexMap(); private readonly mutex = new AsyncMutex(); private maybeStartQueuedTasksInFlight: Promise | undefined; + private maybeStartQueuedTasksRerunRequested = false; + // Git worktree creation touches per-repository metadata; serialize that narrow phase per project + // while allowing post-fork init/send startup work for sibling tasks to overlap. private readonly reservedTaskLaunchByProjectPath = new Map>(); private readonly pendingWaitersByTaskId = new Map(); private readonly pendingStartWaitersByTaskId = new Map(); @@ -1476,7 +1502,10 @@ export class TaskService { }; } - async createMany(argsList: TaskCreateArgs[]): Promise> { + async createMany( + argsList: TaskCreateArgs[], + options: TaskCreateManyOptions = {} + ): Promise> { if (argsList.length === 0) { return Ok([]); } @@ -1682,6 +1711,13 @@ export class TaskService { results.push({ taskId, kind: "agent", status }); } + for (const [index, result] of results.entries()) { + // Workflow callers durably checkpoint returned task IDs before task records are persisted. + // If config persistence fails afterward, replay sees a started step whose task is not found + // and restarts it instead of duplicating an already-launched child after a crash. + await options.onTaskReserved?.(index, result); + } + await this.config.editConfig((config) => { for (const plan of plans) { const runtime = createRuntimeForWorkspace({ @@ -1745,6 +1781,166 @@ export class TaskService { return Ok(results); } + private async cleanupMaterializedTaskWorkspace( + runtime: Runtime, + projectPath: string, + workspaceName: string, + taskId: string + ): Promise { + assert(projectPath.length > 0, "cleanupMaterializedTaskWorkspace requires projectPath"); + assert(workspaceName.length > 0, "cleanupMaterializedTaskWorkspace requires workspaceName"); + assert(taskId.length > 0, "cleanupMaterializedTaskWorkspace requires taskId"); + + try { + const deleteResult = await runtime.deleteWorkspace(projectPath, workspaceName, true); + if (!deleteResult.success) { + log.error("Task launch cleanup: failed to delete materialized workspace", { + taskId, + error: deleteResult.error, + }); + } + } catch (error: unknown) { + log.error("Task launch cleanup: runtime.deleteWorkspace threw", { + taskId, + error: getErrorMessage(error), + }); + } + + try { + const sessionDir = this.config.getSessionDir(taskId); + await fsPromises.rm(sessionDir, { recursive: true, force: true }); + } catch (error: unknown) { + log.error("Task launch cleanup: failed to remove session directory", { + taskId, + error: getErrorMessage(error), + }); + } + } + + private async getExistingMaterializedTaskLaunch( + plan: TaskLaunchPlan, + sourceRuntime: Runtime, + workspace: WorkspaceConfigEntry + ): Promise { + const workspacePath = + coerceNonEmptyString(workspace.path) ?? + sourceRuntime.getWorkspacePath(plan.parentMeta.projectPath, plan.workspaceName); + if (!(await runtimePathExists(sourceRuntime, workspacePath))) { + return null; + } + + const forkedRuntimeConfig = workspace.runtimeConfig ?? plan.taskRuntimeConfig; + const runtimeForTaskWorkspace = createRuntimeForWorkspace({ + runtimeConfig: forkedRuntimeConfig, + projectPath: plan.parentMeta.projectPath, + name: plan.workspaceName, + namedWorkspacePath: workspacePath, + }); + const trunkBranch = + coerceNonEmptyString(workspace.taskTrunkBranch) ?? + coerceNonEmptyString(plan.preferredTrunkBranch) ?? + coerceNonEmptyString(plan.parentMeta.name) ?? + plan.workspaceName; + + return { + workspacePath, + trunkBranch, + forkedRuntimeConfig, + runtimeForTaskWorkspace, + inheritedProjects: workspace.projects ?? plan.parentMeta.projects, + }; + } + + private async runProjectForkExclusive(projectPath: string, fn: () => Promise): Promise { + assert(projectPath.length > 0, "runProjectForkExclusive requires projectPath"); + + const previousLaunch = + this.reservedTaskLaunchByProjectPath.get(projectPath) ?? Promise.resolve(); + const run = previousLaunch.catch(() => undefined).then(fn); + const trackedLaunch = run + .then( + () => undefined, + () => undefined + ) + .finally(() => { + if (this.reservedTaskLaunchByProjectPath.get(projectPath) === trackedLaunch) { + this.reservedTaskLaunchByProjectPath.delete(projectPath); + } + }); + this.reservedTaskLaunchByProjectPath.set(projectPath, trackedLaunch); + return await run; + } + + private async materializeReservedTaskWorkspace( + plan: TaskLaunchPlan, + sourceRuntime: Runtime, + initLogger: InitLogger + ): Promise { + const entry = findWorkspaceEntry(this.config.loadConfigOrDefault(), plan.taskId); + if (entry?.workspace.taskStatus !== "starting") { + return null; + } + + const existing = await this.getExistingMaterializedTaskLaunch( + plan, + sourceRuntime, + entry.workspace + ); + if (existing) { + taskQueueDebug("TaskService.startReservedAgentTask reusing materialized workspace", { + taskId: plan.taskId, + workspacePath: existing.workspacePath, + }); + return existing; + } + + const projectPath = stripTrailingSlashes(plan.parentMeta.projectPath); + return await this.runProjectForkExclusive(projectPath, async () => { + const entryBeforeFork = findWorkspaceEntry(this.config.loadConfigOrDefault(), plan.taskId); + if (entryBeforeFork?.workspace.taskStatus !== "starting") { + return null; + } + + const forkResult = await orchestrateFork({ + sourceRuntime, + projectPath: plan.parentMeta.projectPath, + sourceWorkspaceName: plan.parentMeta.name, + newWorkspaceName: plan.workspaceName, + initLogger, + config: this.config, + sourceWorkspaceId: plan.parentWorkspaceId, + sourceRuntimeConfig: plan.parentRuntimeConfig, + parentMetadata: plan.parentMeta, + allowCreateFallback: true, + ...(plan.preferredTrunkBranch != null + ? { preferredTrunkBranch: plan.preferredTrunkBranch } + : {}), + trusted: + this.config + .loadConfigOrDefault() + .projects.get(stripTrailingSlashes(plan.parentMeta.projectPath))?.trusted ?? false, + multiProjectExperimentEnabled: this.workspaceService.isExperimentEnabled( + EXPERIMENT_IDS.MULTI_PROJECT_WORKSPACES + ), + }); + + if (!forkResult.success) { + throw new Error(`Task fork failed: ${forkResult.error}`); + } + + return { + workspacePath: forkResult.data.workspacePath, + trunkBranch: forkResult.data.trunkBranch, + forkedRuntimeConfig: forkResult.data.forkedRuntimeConfig, + runtimeForTaskWorkspace: forkResult.data.targetRuntime, + inheritedProjects: forkResult.data.projects, + ...(forkResult.data.sourceRuntimeConfigUpdate != null + ? { sourceRuntimeConfigUpdate: forkResult.data.sourceRuntimeConfigUpdate } + : {}), + }; + }); + } + private scheduleReservedTaskLaunch(plan: TaskLaunchPlan): void { assert(plan.taskId.length > 0, "scheduleReservedTaskLaunch requires taskId"); void this.enqueueReservedTaskLaunch(plan).catch((error: unknown) => { @@ -1791,48 +1987,51 @@ export class TaskService { name: plan.parentMeta.name, }); - const forkResult = await orchestrateFork({ - sourceRuntime: runtime, - projectPath: plan.parentMeta.projectPath, - sourceWorkspaceName: plan.parentMeta.name, - newWorkspaceName: plan.workspaceName, - initLogger, - config: this.config, - sourceWorkspaceId: plan.parentWorkspaceId, - sourceRuntimeConfig: plan.parentRuntimeConfig, - parentMetadata: plan.parentMeta, - allowCreateFallback: true, - ...(plan.preferredTrunkBranch != null - ? { preferredTrunkBranch: plan.preferredTrunkBranch } - : {}), - trusted: - this.config - .loadConfigOrDefault() - .projects.get(stripTrailingSlashes(plan.parentMeta.projectPath))?.trusted ?? false, - multiProjectExperimentEnabled: this.workspaceService.isExperimentEnabled( - EXPERIMENT_IDS.MULTI_PROJECT_WORKSPACES - ), - }); + let materialized: MaterializedTaskLaunch | null; + try { + materialized = await this.materializeReservedTaskWorkspace(plan, runtime, initLogger); + } catch (error: unknown) { + initLogger.logComplete(-1); + throw error; + } + if (!materialized) { + initLogger.logComplete(-1); + return; + } - if (forkResult.success && forkResult.data.sourceRuntimeConfigUpdate) { + const entryAfterMaterialize = findWorkspaceEntry( + this.config.loadConfigOrDefault(), + plan.taskId + ); + if (!entryAfterMaterialize) { + initLogger.logComplete(-1); + await this.cleanupMaterializedTaskWorkspace( + materialized.runtimeForTaskWorkspace, + plan.parentMeta.projectPath, + plan.workspaceName, + plan.taskId + ); + return; + } + if (entryAfterMaterialize.workspace.taskStatus !== "starting") { + initLogger.logComplete(-1); + return; + } + + if (materialized.sourceRuntimeConfigUpdate) { await this.config.updateWorkspaceMetadata(plan.parentWorkspaceId, { - runtimeConfig: forkResult.data.sourceRuntimeConfigUpdate, + runtimeConfig: materialized.sourceRuntimeConfigUpdate, }); await this.emitWorkspaceMetadata(plan.parentWorkspaceId); } - if (!forkResult.success) { - initLogger.logComplete(-1); - throw new Error(`Task fork failed: ${forkResult.error}`); - } - const { workspacePath, trunkBranch, forkedRuntimeConfig, - targetRuntime: runtimeForTaskWorkspace, - projects: inheritedProjects, - } = forkResult.data; + runtimeForTaskWorkspace, + inheritedProjects, + } = materialized; this.configureMultiProjectRuntimeEnvResolver(runtimeForTaskWorkspace); const taskBaseCommitShaByProjectPath = await readTaskBaseCommitShaByProjectPath({ @@ -1865,7 +2064,18 @@ export class TaskService { await this.emitWorkspaceMetadata(plan.taskId); const entryBeforeSend = findWorkspaceEntry(this.config.loadConfigOrDefault(), plan.taskId); - if (entryBeforeSend?.workspace.taskStatus !== "starting") { + if (!entryBeforeSend) { + initLogger.logComplete(-1); + await this.cleanupMaterializedTaskWorkspace( + runtimeForTaskWorkspace, + plan.parentMeta.projectPath, + plan.workspaceName, + plan.taskId + ); + return; + } + if (entryBeforeSend.workspace.taskStatus !== "starting") { + initLogger.logComplete(-1); return; } @@ -1907,7 +2117,7 @@ export class TaskService { typeof sendResult.error === "string" ? sendResult.error : formatSendMessageError(sendResult.error).message; - await this.rollbackFailedTaskCreate( + await this.cleanupMaterializedTaskWorkspace( runtimeForTaskWorkspace, plan.parentMeta.projectPath, plan.workspaceName, @@ -3613,17 +3823,30 @@ export class TaskService { } async maybeStartQueuedTasks(): Promise { + const existingRun = this.maybeStartQueuedTasksInFlight; + if (existingRun != null) { + this.maybeStartQueuedTasksRerunRequested = true; + await existingRun; + return; + } + // A foreground task waiter registers itself in waitForAgentReport's async setup. Yield once so // immediate scheduler calls from the same turn see that foreground-awaiting state and avoid a // nested-task deadlock at maxParallelAgentTasks=1. await Promise.resolve(); - const existingRun = this.maybeStartQueuedTasksInFlight; - if (existingRun != null) { - await existingRun; + const existingRunAfterYield = this.maybeStartQueuedTasksInFlight; + if (existingRunAfterYield != null) { + this.maybeStartQueuedTasksRerunRequested = true; + await existingRunAfterYield; return; } - const run = this.maybeStartQueuedTasksFromReservations().finally(() => { + const run = (async () => { + do { + this.maybeStartQueuedTasksRerunRequested = false; + await this.maybeStartQueuedTasksFromReservations(); + } while (this.maybeStartQueuedTasksRerunRequested); + })().finally(() => { if (this.maybeStartQueuedTasksInFlight === run) { this.maybeStartQueuedTasksInFlight = undefined; } @@ -3656,22 +3879,27 @@ export class TaskService { const aTime = a.createdAt ? Date.parse(a.createdAt) : 0; const bTime = b.createdAt ? Date.parse(b.createdAt) : 0; return aTime - bTime; - }) - .slice(0, availableSlots); + }); + let reservedSlots = 0; for (const task of queuedTasks) { + if (reservedSlots >= availableSlots) { + break; + } const taskId = task.id; assert(taskId != null && taskId.length > 0, "queued task id is required"); if (this.aiService.isStreaming(taskId)) { await this.setTaskStatus(taskId, "running"); + reservedSlots += 1; continue; } const queuedPrompt = coerceNonEmptyString(task.taskPrompt); if (!queuedPrompt) { - taskQueueDebug("TaskService.maybeStartQueuedTasks skipping legacy queued task", { + taskQueueDebug("TaskService.maybeStartQueuedTasks failing legacy queued task", { taskId, }); + await this.markTaskLaunchFailed(taskId, "Queued task missing taskPrompt"); continue; } @@ -3781,6 +4009,7 @@ export class TaskService { await this.editWorkspaceEntry(taskId, (workspace) => { workspace.taskStatus = "starting"; }); + reservedSlots += 1; plans.push({ taskId, @@ -3806,30 +4035,21 @@ export class TaskService { } } - for (const plan of plans) { - await this.enqueueReservedTaskLaunch(plan); - } + await Promise.allSettled( + plans.map(async (plan) => { + try { + await this.enqueueReservedTaskLaunch(plan); + } catch (error: unknown) { + log.error("Failed to launch dequeued task", { taskId: plan.taskId, error }); + await this.markTaskLaunchFailed(plan.taskId, getErrorMessage(error)); + } + }) + ); } private async enqueueReservedTaskLaunch(plan: TaskLaunchPlan): Promise { assert(plan.taskId.length > 0, "enqueueReservedTaskLaunch requires taskId"); - const projectPath = stripTrailingSlashes(plan.parentMeta.projectPath); - assert(projectPath.length > 0, "enqueueReservedTaskLaunch requires projectPath"); - - const previousLaunch = - this.reservedTaskLaunchByProjectPath.get(projectPath) ?? Promise.resolve(); - const launch = previousLaunch - .catch(() => undefined) - .then(async () => { - await this.startReservedAgentTask(plan); - }); - const trackedLaunch = launch.finally(() => { - if (this.reservedTaskLaunchByProjectPath.get(projectPath) === trackedLaunch) { - this.reservedTaskLaunchByProjectPath.delete(projectPath); - } - }); - this.reservedTaskLaunchByProjectPath.set(projectPath, trackedLaunch); - await launch; + await this.startReservedAgentTask(plan); } private async setTaskStatus(workspaceId: string, status: AgentTaskStatus): Promise { diff --git a/src/node/services/tools/task.ts b/src/node/services/tools/task.ts index 5b786c9a0c..353c37f83d 100644 --- a/src/node/services/tools/task.ts +++ b/src/node/services/tools/task.ts @@ -294,9 +294,11 @@ function normalizePendingTaskStatuses(params: { status: currentStatus === "queued" ? "queued" - : currentStatus === "interrupted" - ? "interrupted" - : "running", + : currentStatus === "starting" + ? "starting" + : currentStatus === "interrupted" + ? "interrupted" + : "running", groupKind: createdTask.groupKind, label: createdTask.label, }; diff --git a/src/node/services/workflows/WorkflowTaskServiceAdapter.test.ts b/src/node/services/workflows/WorkflowTaskServiceAdapter.test.ts index 24f3c12e79..9c6b0bde79 100644 --- a/src/node/services/workflows/WorkflowTaskServiceAdapter.test.ts +++ b/src/node/services/workflows/WorkflowTaskServiceAdapter.test.ts @@ -1,6 +1,8 @@ /* eslint-disable @typescript-eslint/await-thenable, @typescript-eslint/require-await */ +import assert from "node:assert/strict"; import { describe, expect, mock, test } from "bun:test"; import { Ok } from "@/common/types/result"; +import type { TaskCreateResult } from "@/node/services/taskService"; import { WorkflowTaskServiceAdapter } from "./WorkflowTaskServiceAdapter"; describe("WorkflowTaskServiceAdapter", () => { @@ -128,11 +130,22 @@ describe("WorkflowTaskServiceAdapter", () => { }); test("bulk creates workflow child tasks with workflow metadata", async () => { - const createMany = mock(async (_args: unknown[]) => - Ok([ - { taskId: "task_1", kind: "agent" as const, status: "starting" as const }, - { taskId: "task_2", kind: "agent" as const, status: "queued" as const }, - ]) + const createMany = mock( + async ( + _args: unknown[], + options?: { + onTaskReserved?: (index: number, result: TaskCreateResult) => Promise | void; + } + ) => { + const results = [ + { taskId: "task_1", kind: "agent" as const, status: "starting" as const }, + { taskId: "task_2", kind: "agent" as const, status: "queued" as const }, + ]; + for (const [index, result] of results.entries()) { + await options?.onTaskReserved?.(index, result); + } + return Ok(results); + } ); const create = mock(async () => Ok({ taskId: "unused", kind: "agent" as const, status: "running" as const }) @@ -167,7 +180,7 @@ describe("WorkflowTaskServiceAdapter", () => { [0, "task_1"], [1, "task_2"], ]); - expect(createMany).toHaveBeenCalledWith([ + expect(createMany.mock.calls[0]?.[0]).toEqual([ { parentWorkspaceId: "parent_1", kind: "agent", @@ -187,6 +200,11 @@ describe("WorkflowTaskServiceAdapter", () => { experiments: { dynamicWorkflows: true }, }, ]); + const createManyOptions: unknown = createMany.mock.calls[0]?.[1]; + assert(createManyOptions != null && typeof createManyOptions === "object"); + expect(typeof (createManyOptions as { onTaskReserved?: unknown }).onTaskReserved).toBe( + "function" + ); }); test("passes workflow wait options into report waits", async () => { diff --git a/src/node/services/workflows/WorkflowTaskServiceAdapter.ts b/src/node/services/workflows/WorkflowTaskServiceAdapter.ts index c74a0a3692..b709dc5c8a 100644 --- a/src/node/services/workflows/WorkflowTaskServiceAdapter.ts +++ b/src/node/services/workflows/WorkflowTaskServiceAdapter.ts @@ -56,7 +56,10 @@ interface WorkflowTaskServiceLike { experiments?: WorkflowTaskExperiments; modelString?: string; thinkingLevel?: ParsedThinkingInput; - }> + }>, + options?: { + onTaskReserved?: (index: number, result: TaskCreateResult) => Promise | void; + } ): Promise<{ success: true; data: TaskCreateResult[] } | { success: false; error: string }>; waitForAgentReport( taskId: string, @@ -217,7 +220,13 @@ export class WorkflowTaskServiceAdapter implements WorkflowTaskAdapter { } const createResult = await this.taskService.createMany( - specs.map((spec) => this.buildCreateArgs(spec)) + specs.map((spec) => this.buildCreateArgs(spec)), + { + onTaskReserved: async (index, result) => { + assert(result.taskId.length > 0, "createAgentTasks: taskId is required"); + await lifecycle?.onTaskCreated?.(index, result.taskId); + }, + } ); if (!createResult.success) { throw new Error(createResult.error); @@ -227,9 +236,8 @@ export class WorkflowTaskServiceAdapter implements WorkflowTaskAdapter { } const created: Array<{ taskId: string; status: "queued" | "starting" | "running" }> = []; - for (const [index, result] of createResult.data.entries()) { + for (const result of createResult.data) { assert(result.taskId.length > 0, "createAgentTasks: taskId is required"); - await lifecycle?.onTaskCreated?.(index, result.taskId); created.push({ taskId: result.taskId, status: result.status }); } return created; From 2d1dbeb03ff7346ae8e31caae83e70be29682bf9 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Mon, 8 Jun 2026 20:44:28 +0000 Subject: [PATCH 3/5] tests: harden workflow unit flakes --- .../services/workflows/WorkflowRunner.test.ts | 3 +- .../builtInWorkflowDefinitions.test.ts | 142 ++++++++++++++---- 2 files changed, 116 insertions(+), 29 deletions(-) diff --git a/src/node/services/workflows/WorkflowRunner.test.ts b/src/node/services/workflows/WorkflowRunner.test.ts index 0abd59ce58..b23b521c7f 100644 --- a/src/node/services/workflows/WorkflowRunner.test.ts +++ b/src/node/services/workflows/WorkflowRunner.test.ts @@ -785,6 +785,7 @@ describe("WorkflowRunner", () => { waitedFor.push(taskId); waitTimeoutMs = waitOptions?.timeoutMs; waitAbortSignal = waitOptions?.abortSignal; + expect(waitAbortSignal?.aborted).toBe(false); return { taskId, reportMarkdown: "summary" }; }, }); @@ -799,7 +800,7 @@ describe("WorkflowRunner", () => { ); expect(startedEvents).toHaveLength(1); expect(waitTimeoutMs).toBeGreaterThan(5 * 60 * 1000); - expect(waitAbortSignal?.aborted).toBe(false); + expect(waitAbortSignal).toBeDefined(); }); test("adds one started task event when resuming legacy started steps", async () => { diff --git a/src/node/services/workflows/builtInWorkflowDefinitions.test.ts b/src/node/services/workflows/builtInWorkflowDefinitions.test.ts index 0e43c992b8..8eab7ac7b7 100644 --- a/src/node/services/workflows/builtInWorkflowDefinitions.test.ts +++ b/src/node/services/workflows/builtInWorkflowDefinitions.test.ts @@ -11,6 +11,8 @@ import { WorkflowActionRegistry } from "./WorkflowActionRegistry"; import { WorkflowRunStore } from "./WorkflowRunStore"; import { WorkflowRunner, type WorkflowAgentResult, type WorkflowAgentSpec } from "./WorkflowRunner"; +// Keep this above tiny lock timeouts: these fixtures exercise parallel workflow callbacks +// that legitimately contend for the run-store mutation lock under CI coverage load. const BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS = 100; const deepResearch = BUILT_IN_WORKFLOW_DEFINITIONS.find( @@ -1549,7 +1551,10 @@ describe("built-in deep-review-workflow", () => { await runGit(repoRoot, ["branch", "-M", "main"]); await fs.writeFile(path.join(repoRoot, "tracked.txt"), "base\ndirty\n", "utf-8"); - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_git_context", workspaceId: "workspace-1", @@ -1674,7 +1679,10 @@ describe("built-in deep-review-workflow", () => { await runGit(repoRoot, ["branch", "-M", "main"]); await fs.writeFile(path.join(repoRoot, "unrelated.txt"), "base\nunrelated dirty\n", "utf-8"); - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_explicit_context", workspaceId: "workspace-1", @@ -1746,7 +1754,10 @@ describe("built-in deep-review-workflow", () => { ); } - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_status_fallback", workspaceId: "workspace-1", @@ -1824,7 +1835,10 @@ describe("built-in deep-review-workflow", () => { title: "Docs are unclear", confidence: "medium", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix", workspaceId: "workspace-1", @@ -2041,7 +2055,10 @@ describe("built-in deep-review-workflow", () => { validation: "Run targeted tests.", confidence: "high", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_loop", workspaceId: "workspace-1", @@ -2310,7 +2327,10 @@ describe("built-in deep-review-workflow", () => { validation: "Run targeted tests.", confidence: "high", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_loop_budget", workspaceId: "workspace-1", @@ -2509,7 +2529,10 @@ describe("built-in deep-review-workflow", () => { validation: "Run targeted tests.", confidence: "high", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_loop_budget_exhausted", workspaceId: "workspace-1", @@ -2694,7 +2717,10 @@ describe("built-in deep-review-workflow", () => { validation: "Run targeted tests.", confidence: "high", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_loop_no_progress", workspaceId: "workspace-1", @@ -2850,7 +2876,10 @@ describe("built-in deep-review-workflow", () => { validation: "Run targeted tests.", confidence: "high", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_loop_validation_failed", workspaceId: "workspace-1", @@ -3020,7 +3049,10 @@ describe("built-in deep-review-workflow", () => { validation: "Run targeted tests.", confidence: "high", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_loop_validation_not_run", workspaceId: "workspace-1", @@ -3190,7 +3222,10 @@ describe("built-in deep-review-workflow", () => { validation: "Run targeted tests.", confidence: "high", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_loop_max_iterations", workspaceId: "workspace-1", @@ -3373,7 +3408,10 @@ describe("built-in deep-review-workflow", () => { title: "Final review drops this issue", rationale: "The final synthesis omits this candidate.", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_final_gate", workspaceId: "workspace-1", @@ -3554,7 +3592,10 @@ describe("built-in deep-review-workflow", () => { validation: "Add a failing persistence regression test.", confidence: "high", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_verifier_mismatch", workspaceId: "workspace-1", @@ -3691,7 +3732,10 @@ describe("built-in deep-review-workflow", () => { title: "Selected but cannot be fixed automatically", suggestedFix: "Needs product clarification.", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_filter", workspaceId: "workspace-1", @@ -3858,7 +3902,10 @@ describe("built-in deep-review-workflow", () => { await runGit(repoRoot, ["commit", "-m", "base commit"]); await fs.writeFile(path.join(repoRoot, "service.ts"), "export const value = 2;\n", "utf-8"); - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_dirty", workspaceId: "workspace-1", @@ -3920,7 +3967,10 @@ describe("built-in deep-review-workflow", () => { await runGit(repoRoot, ["commit", "-m", "base commit"]); await runGit(repoRoot, ["branch", "-M", "main"]); - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_noncurrent_head", workspaceId: "workspace-1", @@ -3991,7 +4041,10 @@ describe("built-in deep-review-workflow", () => { await runGit(repoRoot, ["commit", "-m", "base commit"]); await runGit(repoRoot, ["branch", "-M", "main"]); - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_checkout_drift", workspaceId: "workspace-1", @@ -4075,7 +4128,10 @@ describe("built-in deep-review-workflow", () => { validation: "Run targeted workflow tests.", confidence: "high", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_detached_head", workspaceId: "workspace-1", @@ -4215,7 +4271,10 @@ describe("built-in deep-review-workflow", () => { validation: "Run targeted workflow tests.", confidence: "high", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_non_current_ref", workspaceId: "workspace-1", @@ -4356,7 +4415,10 @@ describe("built-in deep-review-workflow", () => { validation: "Run targeted tests.", confidence: "high", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_head_drift", workspaceId: "workspace-1", @@ -4518,7 +4580,10 @@ describe("built-in deep-review-workflow", () => { validation: "Run targeted tests.", confidence: "high", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_replay_head_advance", workspaceId: "workspace-1", @@ -4712,7 +4777,10 @@ describe("built-in deep-review-workflow", () => { validation: "Add a branch-specific regression test.", confidence: "high", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_conflict", workspaceId: "workspace-1", @@ -4927,7 +4995,10 @@ describe("built-in deep-review-workflow", () => { validation: "Add a branch-specific regression test.", confidence: "high", }; - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_resolver_mismatch", workspaceId: "workspace-1", @@ -5105,7 +5176,10 @@ describe("built-in deep-review-workflow", () => { throw new Error("Expected built-in deep-review-workflow workflow"); } using tmp = new DisposableTempDir("deep-review-workflow-prose-fix-mention"); - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_prose_fix_mention", workspaceId: "workspace-1", @@ -5156,7 +5230,10 @@ describe("built-in deep-review-workflow", () => { throw new Error("Expected built-in deep-review-workflow workflow"); } using tmp = new DisposableTempDir("deep-review-workflow-no-fix-flag"); - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_no_fix_flag", workspaceId: "workspace-1", @@ -5225,7 +5302,10 @@ describe("built-in deep-review-workflow", () => { throw new Error("Expected built-in deep-review-workflow workflow"); } using tmp = new DisposableTempDir("deep-review-workflow-loop-without-fix"); - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_loop_without_fix", workspaceId: "workspace-1", @@ -5266,7 +5346,10 @@ describe("built-in deep-review-workflow", () => { throw new Error("Expected built-in deep-review-workflow workflow"); } using tmp = new DisposableTempDir("deep-review-workflow-fix-explicit-diff"); - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_fix_explicit_diff", workspaceId: "workspace-1", @@ -5331,7 +5414,10 @@ describe("built-in deep-review-workflow", () => { await runGit(repoRoot, ["commit", "-m", "base commit"]); await runGit(repoRoot, ["branch", "-M", "main"]); - const runStore = new WorkflowRunStore({ sessionDir: tmp.path, staleLeaseMs: 10 }); + const runStore = new WorkflowRunStore({ + sessionDir: tmp.path, + staleLeaseMs: BUILT_IN_WORKFLOW_TEST_STALE_LEASE_MS, + }); await runStore.createRun({ id: "wfr_deep_review_invalid_ref", workspaceId: "workspace-1", From 9d659ba48c530d555c5c9d4f3f1cc87d03389c8b Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Mon, 8 Jun 2026 21:00:04 +0000 Subject: [PATCH 4/5] fix: preserve legacy queued task resume --- src/node/services/taskService.test.ts | 72 +++++++++++++++++++++++++++ src/node/services/taskService.ts | 52 +++++++++++-------- 2 files changed, 104 insertions(+), 20 deletions(-) diff --git a/src/node/services/taskService.test.ts b/src/node/services/taskService.test.ts index e6f5aab99d..b8ba6beaf9 100644 --- a/src/node/services/taskService.test.ts +++ b/src/node/services/taskService.test.ts @@ -698,6 +698,78 @@ describe("TaskService", () => { expect(started?.taskStatus).toBe("running"); }, 20_000); + test("resumes legacy queued tasks that do not persist taskPrompt", async () => { + const config = await createTestConfig(rootDir); + const projectPath = await createTestProject(rootDir); + + const runtimeConfig = { type: "worktree" as const, srcBaseDir: config.srcDir }; + const runtime = createRuntime(runtimeConfig, { projectPath }); + const initLogger = createNullInitLogger(); + + const parentName = "parent"; + await runtime.createWorkspace({ + projectPath, + branchName: parentName, + trunkBranch: "main", + directoryName: parentName, + initLogger, + }); + + const parentId = "1111111111"; + const queuedTaskId = "task-queued"; + const queuedWorkspaceName = "agent_explore_task-queued"; + await saveWorkspaces( + config, + projectPath, + [ + { + path: runtime.getWorkspacePath(projectPath, parentName), + id: parentId, + name: parentName, + createdAt: new Date().toISOString(), + runtimeConfig, + }, + { + path: runtime.getWorkspacePath(projectPath, queuedWorkspaceName), + id: queuedTaskId, + name: queuedWorkspaceName, + title: "Legacy queued task", + createdAt: new Date().toISOString(), + runtimeConfig, + parentWorkspaceId: parentId, + agentId: "explore", + agentType: "explore", + taskStatus: "queued", + taskModelString: defaultModel, + taskTrunkBranch: parentName, + }, + ], + testTaskSettings(1, 3) + ); + + expect(findWorkspaceInConfig(config, queuedTaskId)?.taskPrompt).toBeUndefined(); + + const { workspaceService, resumeStream } = createWorkspaceServiceMocks(); + const { taskService } = createTaskServiceHarness(config, { workspaceService }); + const runBackgroundInitSpy = spyOn(runtimeFactory, "runBackgroundInit").mockImplementation( + () => undefined + ); + try { + await taskService.initialize(); + + expect(resumeStream).toHaveBeenCalledWith( + queuedTaskId, + expect.objectContaining({ model: defaultModel, agentId: "explore" }), + expect.objectContaining({ allowQueuedAgentTask: true, agentInitiated: true }) + ); + } finally { + runBackgroundInitSpy.mockRestore(); + } + + const started = findWorkspaceInConfig(config, queuedTaskId); + expect(started?.taskStatus).toBe("running"); + }, 20_000); + test("does not count foreground-awaiting tasks towards maxParallelAgentTasks", async () => { const config = await createTestConfig(rootDir); stubStableIds(config, ["aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc"], "dddddddddd"); diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index 9f18be23b9..49e6ef854e 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -278,13 +278,15 @@ export interface TaskCreateResult { status: "queued" | "starting" | "running"; } +type TaskLaunchStart = { kind: "sendMessage"; prompt: string } | { kind: "resumeStream" }; + interface TaskLaunchPlan { taskId: string; parentWorkspaceId: string; parentMeta: WorkspaceMetadata; agentId: string; agentType: string; - prompt: string; + start: TaskLaunchStart; title: string; workspaceName: string; createdAt: string; @@ -1693,7 +1695,7 @@ export class TaskService { parentMeta, agentId, agentType, - prompt, + start: { kind: "sendMessage", prompt }, title: args.title, workspaceName, createdAt, @@ -1755,7 +1757,7 @@ export class TaskService { workflowTask: plan.workflowTask, bestOf: plan.bestOf, taskStatus: plan.status, - taskPrompt: plan.prompt, + taskPrompt: plan.start.kind === "sendMessage" ? plan.start.prompt : undefined, taskTrunkBranch: trunkBranch, taskModelString: plan.taskModelString, taskThinkingLevel: plan.effectiveThinkingLevel, @@ -1973,7 +1975,9 @@ export class TaskService { private async startReservedAgentTask(plan: TaskLaunchPlan): Promise { assert(plan.taskId.length > 0, "startReservedAgentTask requires taskId"); assert(plan.parentWorkspaceId.length > 0, "startReservedAgentTask requires parentWorkspaceId"); - assert(plan.prompt.length > 0, "startReservedAgentTask requires prompt"); + if (plan.start.kind === "sendMessage") { + assert(plan.start.prompt.length > 0, "startReservedAgentTask requires prompt"); + } const entryAtStart = findWorkspaceEntry(this.config.loadConfigOrDefault(), plan.taskId); if (entryAtStart?.workspace.taskStatus !== "starting") { @@ -2101,17 +2105,22 @@ export class TaskService { plan.taskId ); - const sendResult = await this.workspaceService.sendMessage( - plan.taskId, - plan.prompt, - { - model: plan.taskModelString, - agentId: plan.agentId, - thinkingLevel: plan.effectiveThinkingLevel, - experiments: plan.experiments, - }, - { allowQueuedAgentTask: true, agentInitiated: true } - ); + const startOptions = { + model: plan.taskModelString, + agentId: plan.agentId, + thinkingLevel: plan.effectiveThinkingLevel, + experiments: plan.experiments, + }; + const sendResult = + plan.start.kind === "sendMessage" + ? await this.workspaceService.sendMessage(plan.taskId, plan.start.prompt, startOptions, { + allowQueuedAgentTask: true, + agentInitiated: true, + }) + : await this.workspaceService.resumeStream(plan.taskId, startOptions, { + allowQueuedAgentTask: true, + agentInitiated: true, + }); if (!sendResult.success) { const message = typeof sendResult.error === "string" @@ -3895,12 +3904,15 @@ export class TaskService { } const queuedPrompt = coerceNonEmptyString(task.taskPrompt); - if (!queuedPrompt) { - taskQueueDebug("TaskService.maybeStartQueuedTasks failing legacy queued task", { + const start: TaskLaunchStart = queuedPrompt + ? { kind: "sendMessage", prompt: queuedPrompt } + : { kind: "resumeStream" }; + if (start.kind === "resumeStream") { + // Older queued task records stored the initial prompt only in chat history. + // Keep those upgrade-safe by resuming the existing pending stream instead of failing launch. + taskQueueDebug("TaskService.maybeStartQueuedTasks legacy resumeStream reservation", { taskId, }); - await this.markTaskLaunchFailed(taskId, "Queued task missing taskPrompt"); - continue; } const parentWorkspaceId = coerceNonEmptyString(task.parentWorkspaceId); @@ -4017,7 +4029,7 @@ export class TaskService { parentMeta, agentId, agentType: task.agentType ?? agentId, - prompt: queuedPrompt, + start, title: task.title ?? workspaceName, workspaceName, createdAt, From 1ed702427573935f3cee3f692000a8a7db8ca22a Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Mon, 8 Jun 2026 21:12:07 +0000 Subject: [PATCH 5/5] fix: avoid replaying accepted task prompts --- src/node/services/taskService.test.ts | 55 +++++++++++++++++++++------ src/node/services/taskService.ts | 39 ++++++++++++++++++- 2 files changed, 82 insertions(+), 12 deletions(-) diff --git a/src/node/services/taskService.test.ts b/src/node/services/taskService.test.ts index b8ba6beaf9..35878ed2e8 100644 --- a/src/node/services/taskService.test.ts +++ b/src/node/services/taskService.test.ts @@ -698,7 +698,7 @@ describe("TaskService", () => { expect(started?.taskStatus).toBe("running"); }, 20_000); - test("resumes legacy queued tasks that do not persist taskPrompt", async () => { + test("resumes accepted queued starts instead of replaying prompts", async () => { const config = await createTestConfig(rootDir); const projectPath = await createTestProject(rootDir); @@ -718,6 +718,9 @@ describe("TaskService", () => { const parentId = "1111111111"; const queuedTaskId = "task-queued"; const queuedWorkspaceName = "agent_explore_task-queued"; + const acceptedStartingTaskId = "task-starting-accepted"; + const acceptedStartingWorkspaceName = "agent_explore_task-starting-accepted"; + const acceptedPrompt = "already accepted prompt"; await saveWorkspaces( config, projectPath, @@ -743,31 +746,61 @@ describe("TaskService", () => { taskModelString: defaultModel, taskTrunkBranch: parentName, }, + { + path: runtime.getWorkspacePath(projectPath, acceptedStartingWorkspaceName), + id: acceptedStartingTaskId, + name: acceptedStartingWorkspaceName, + title: "Accepted starting task", + createdAt: new Date().toISOString(), + runtimeConfig, + parentWorkspaceId: parentId, + agentId: "explore", + agentType: "explore", + taskStatus: "starting", + taskPrompt: acceptedPrompt, + taskModelString: defaultModel, + taskTrunkBranch: parentName, + }, ], - testTaskSettings(1, 3) + testTaskSettings(2, 3) ); + const { workspaceService, sendMessage, resumeStream } = createWorkspaceServiceMocks(); + const { historyService, taskService } = createTaskServiceHarness(config, { workspaceService }); + const appendAcceptedPrompt = await historyService.appendToHistory( + acceptedStartingTaskId, + createMuxMessage("accepted-starting-prompt", "user", acceptedPrompt) + ); + expect(appendAcceptedPrompt.success).toBe(true); expect(findWorkspaceInConfig(config, queuedTaskId)?.taskPrompt).toBeUndefined(); + expect(findWorkspaceInConfig(config, acceptedStartingTaskId)?.taskPrompt).toBe(acceptedPrompt); - const { workspaceService, resumeStream } = createWorkspaceServiceMocks(); - const { taskService } = createTaskServiceHarness(config, { workspaceService }); const runBackgroundInitSpy = spyOn(runtimeFactory, "runBackgroundInit").mockImplementation( () => undefined ); try { await taskService.initialize(); - expect(resumeStream).toHaveBeenCalledWith( - queuedTaskId, - expect.objectContaining({ model: defaultModel, agentId: "explore" }), - expect.objectContaining({ allowQueuedAgentTask: true, agentInitiated: true }) - ); + for (const taskId of [queuedTaskId, acceptedStartingTaskId]) { + expect(resumeStream).toHaveBeenCalledWith( + taskId, + expect.objectContaining({ model: defaultModel, agentId: "explore" }), + expect.objectContaining({ allowQueuedAgentTask: true, agentInitiated: true }) + ); + } + const sendMessagePrompts = ( + sendMessage as unknown as { mock: { calls: unknown[][] } } + ).mock.calls.map((call) => call[1]); + expect(sendMessagePrompts).not.toContain(acceptedPrompt); } finally { runBackgroundInitSpy.mockRestore(); } - const started = findWorkspaceInConfig(config, queuedTaskId); - expect(started?.taskStatus).toBe("running"); + const queued = findWorkspaceInConfig(config, queuedTaskId); + expect(queued?.taskStatus).toBe("running"); + const acceptedStarting = findWorkspaceInConfig(config, acceptedStartingTaskId); + expect(acceptedStarting?.taskStatus).toBe("running"); + expect(acceptedStarting?.taskPrompt).toBeUndefined(); }, 20_000); test("does not count foreground-awaiting tasks towards maxParallelAgentTasks", async () => { diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index 49e6ef854e..e192f00e46 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -1289,17 +1289,39 @@ export class TaskService { (task) => task.taskStatus === "starting" && typeof task.id === "string" ); if (staleStartingTasks.length > 0) { + const recoveries = new Map< + string, + { status: Extract; acceptedPrompt: boolean } + >(); + for (const task of staleStartingTasks) { + assert(task.id != null && task.id.length > 0, "stale starting task id is required"); + const isStreaming = this.aiService.isStreaming(task.id); + recoveries.set(task.id, { + status: isStreaming ? "running" : "queued", + acceptedPrompt: !isStreaming && (await this.hasAcceptedInitialTaskPrompt(task.id)), + }); + } + await this.config.editConfig((config) => { for (const task of staleStartingTasks) { assert(task.id != null && task.id.length > 0, "stale starting task id is required"); + const recovery = recoveries.get(task.id); + assert(recovery != null, "stale starting task recovery is required"); const entry = findWorkspaceEntry(config, task.id); if (!entry) continue; - entry.workspace.taskStatus = this.aiService.isStreaming(task.id) ? "running" : "queued"; + entry.workspace.taskStatus = recovery.status; + if (recovery.acceptedPrompt) { + // The initial prompt is already durable in chat history; clearing taskPrompt makes the + // queued recovery path resume that accepted turn instead of appending a duplicate user turn. + entry.workspace.taskPrompt = undefined; + } } return config; }); log.info("[startup] Recovered stale starting agent tasks", { count: staleStartingTasks.length, + acceptedPromptCount: [...recoveries.values()].filter((recovery) => recovery.acceptedPrompt) + .length, }); } @@ -1490,6 +1512,21 @@ export class TaskService { }); } + private async hasAcceptedInitialTaskPrompt(workspaceId: string): Promise { + assert(workspaceId.length > 0, "hasAcceptedInitialTaskPrompt: workspaceId must be non-empty"); + + const historyResult = await this.historyService.getHistoryFromLatestBoundary(workspaceId); + if (!historyResult.success) { + log.warn("Failed to inspect task history during stale starting recovery", { + workspaceId, + error: historyResult.error, + }); + return false; + } + + return historyResult.data.some((message) => message.role === "user"); + } + private startWorkspaceInit(workspaceId: string, projectPath: string): InitLogger { assert(workspaceId.length > 0, "startWorkspaceInit: workspaceId must be non-empty"); assert(projectPath.length > 0, "startWorkspaceInit: projectPath must be non-empty");