Skip to content

Commit ebf434f

Browse files
authored
fix(mship): add tool watchdog (#4991)
* Add tool watchdog * Updates * Greptile comment * FIxes * Fix
1 parent e1af2bf commit ebf434f

13 files changed

Lines changed: 481 additions & 118 deletions

File tree

apps/sim/app/api/copilot/chat/abort/route.ts

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ import { TraceSpan } from '@/lib/copilot/generated/trace-spans-v1'
1010
import { fetchGo } from '@/lib/copilot/request/go/fetch'
1111
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request/http'
1212
import { withCopilotSpan, withIncomingGoSpan } from '@/lib/copilot/request/otel'
13-
import { abortActiveStream, waitForPendingChatStream } from '@/lib/copilot/request/session'
13+
import {
14+
abortActiveStream,
15+
releasePendingChatStream,
16+
waitForPendingChatStream,
17+
} from '@/lib/copilot/request/session'
1418
import { getMothershipBaseURL, getMothershipSourceEnvHeaders } from '@/lib/copilot/server/agent-url'
1519
import { env } from '@/lib/core/config/env'
1620
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
@@ -139,11 +143,21 @@ export const POST = withRouteHandler((request: NextRequest) =>
139143
}
140144
)
141145
if (!settled) {
142-
rootSpan.setAttribute(TraceAttr.CopilotAbortOutcome, CopilotAbortOutcome.SettleTimeout)
143-
return NextResponse.json(
144-
{ error: 'Previous response is still shutting down', aborted, settled: false },
145-
{ status: 409 }
146-
)
146+
// The holder didn't settle within the grace window even though the
147+
// user explicitly stopped it and abort markers are written on both
148+
// sides (local + Go). Don't leave the chat hostage to a wedged
149+
// handler: break its stream lock. This is safe by construction —
150+
// releaseLock only deletes when the value still matches this
151+
// streamId (never clobbers a newer stream), and the old handler's
152+
// heartbeat uses extendLock-if-owner, so it observes the loss and
153+
// stops heartbeating rather than re-asserting.
154+
await releasePendingChatStream(chatId, streamId)
155+
logger.warn('Stream did not settle after abort; force-released chat stream lock', {
156+
chatId,
157+
streamId,
158+
})
159+
rootSpan.setAttribute(TraceAttr.CopilotAbortOutcome, CopilotAbortOutcome.ForceReleased)
160+
return NextResponse.json({ aborted, settled: false, forceReleased: true })
147161
}
148162
rootSpan.setAttribute(TraceAttr.CopilotAbortOutcome, CopilotAbortOutcome.Settled)
149163
return NextResponse.json({ aborted, settled: true })

apps/sim/lib/api/contracts/copilot.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,9 @@ export const copilotChatAbortContract = defineRouteContract({
713713
schema: z.object({
714714
aborted: z.boolean(),
715715
settled: z.boolean().optional(),
716+
// True when the stream did not settle within the grace window and the
717+
// chat stream lock was force-broken so the chat is immediately usable.
718+
forceReleased: z.boolean().optional(),
716719
}),
717720
},
718721
})

apps/sim/lib/copilot/constants.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,26 @@ export const SIM_AGENT_API_URL =
1313
/** Default timeout for the copilot orchestration stream loop (60 min). */
1414
export const ORCHESTRATION_TIMEOUT_MS = 3_600_000
1515

16+
/**
17+
* Watchdog cap for a single sim-executed copilot tool. A tool that neither
18+
* resolves nor rejects within its cap is failed with a timeout error so the
19+
* checkpoint loop can resume Go with an error result instead of wedging the
20+
* chat (and its pending-stream lock) behind a hung await forever.
21+
*/
22+
export const TOOL_WATCHDOG_DEFAULT_MS = 60_000
23+
24+
/**
25+
* Watchdog cap for tool classes with legitimately long runtimes (workflow
26+
* executions, media/image generation, sandboxed code, deep research). Those
27+
* tools carry their own inner budgets (plan execution timeouts, sandbox
28+
* timeouts), so this cap only backstops a true hang and sits above all of
29+
* them — matching ORCHESTRATION_TIMEOUT_MS so it never undercuts a legal run.
30+
*/
31+
export const TOOL_WATCHDOG_LONG_RUNNING_MS = ORCHESTRATION_TIMEOUT_MS
32+
33+
/** Extra slack the resume gate allows past the slowest pending tool's watchdog. */
34+
export const TOOL_WATCHDOG_RESUME_GRACE_MS = 30_000
35+
1636
/** Timeout for the client-side streaming response handler (60 min). */
1737
export const STREAM_TIMEOUT_MS = 3_600_000
1838

apps/sim/lib/copilot/generated/trace-attribute-values-v1.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ export type BillingRouteOutcomeValue = (typeof BillingRouteOutcome)[BillingRoute
7272
export const CopilotAbortOutcome = {
7373
BadRequest: 'bad_request',
7474
FallbackPersistFailed: 'fallback_persist_failed',
75+
ForceReleased: 'force_released',
7576
MissingMessageId: 'missing_message_id',
7677
MissingStreamId: 'missing_stream_id',
7778
NoChatId: 'no_chat_id',

apps/sim/lib/copilot/generated/trace-attributes-v1.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,8 @@ export const TraceAttr = {
270270
CopilotVfsInputMediaTypeClaimed: 'copilot.vfs.input.media_type_claimed',
271271
CopilotVfsInputMediaTypeDetected: 'copilot.vfs.input.media_type_detected',
272272
CopilotVfsInputWidth: 'copilot.vfs.input.width',
273+
CopilotVfsMaterializeFileCount: 'copilot.vfs.materialize.file_count',
274+
CopilotVfsMaterializePhaseMs: 'copilot.vfs.materialize.phase_ms',
273275
CopilotVfsMetadataFailed: 'copilot.vfs.metadata.failed',
274276
CopilotVfsOutcome: 'copilot.vfs.outcome',
275277
CopilotVfsOutputBytes: 'copilot.vfs.output.bytes',
@@ -877,6 +879,8 @@ export const TraceAttrValues: readonly TraceAttrValue[] = [
877879
'copilot.vfs.input.media_type_claimed',
878880
'copilot.vfs.input.media_type_detected',
879881
'copilot.vfs.input.width',
882+
'copilot.vfs.materialize.file_count',
883+
'copilot.vfs.materialize.phase_ms',
880884
'copilot.vfs.metadata.failed',
881885
'copilot.vfs.outcome',
882886
'copilot.vfs.output.bytes',

apps/sim/lib/copilot/generated/trace-spans-v1.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ export const TraceSpan = {
6666
CopilotToolsWriteCsvToTable: 'copilot.tools.write_csv_to_table',
6767
CopilotToolsWriteOutputFile: 'copilot.tools.write_output_file',
6868
CopilotToolsWriteOutputTable: 'copilot.tools.write_output_table',
69+
CopilotVfsMaterialize: 'copilot.vfs.materialize',
6970
CopilotVfsPrepareImage: 'copilot.vfs.prepare_image',
7071
CopilotVfsReadFile: 'copilot.vfs.read_file',
7172
GenAiAgentExecute: 'gen_ai.agent.execute',
@@ -140,6 +141,7 @@ export const TraceSpanValues: readonly TraceSpanValue[] = [
140141
'copilot.tools.write_csv_to_table',
141142
'copilot.tools.write_output_file',
142143
'copilot.tools.write_output_table',
144+
'copilot.vfs.materialize',
143145
'copilot.vfs.prepare_image',
144146
'copilot.vfs.read_file',
145147
'gen_ai.agent.execute',

apps/sim/lib/copilot/request/lifecycle/run.test.ts

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,23 @@ import type { ExecutionContext, StreamingContext } from '@/lib/copilot/request/t
77

88
const {
99
mockCreateRunSegment,
10+
mockForceFailHungToolCall,
1011
mockGetEffectiveDecryptedEnv,
1112
mockGetMothershipBaseURL,
1213
mockGetMothershipSourceEnvHeaders,
1314
mockPrepareExecutionContext,
1415
mockRunStreamLoop,
16+
mockToolWatchdogTimeoutMs,
1517
mockUpdateRunStatus,
1618
} = vi.hoisted(() => ({
1719
mockCreateRunSegment: vi.fn(),
20+
mockForceFailHungToolCall: vi.fn(),
1821
mockGetEffectiveDecryptedEnv: vi.fn(),
1922
mockGetMothershipBaseURL: vi.fn(),
2023
mockGetMothershipSourceEnvHeaders: vi.fn(),
2124
mockPrepareExecutionContext: vi.fn(),
2225
mockRunStreamLoop: vi.fn(),
26+
mockToolWatchdogTimeoutMs: vi.fn(() => 60_000),
2327
mockUpdateRunStatus: vi.fn(),
2428
}))
2529

@@ -84,6 +88,8 @@ vi.mock('@/lib/copilot/request/tools/billing', () => ({
8488

8589
vi.mock('@/lib/copilot/request/tools/executor', () => ({
8690
executeToolAndReport: vi.fn(),
91+
forceFailHungToolCall: mockForceFailHungToolCall,
92+
toolWatchdogTimeoutMs: mockToolWatchdogTimeoutMs,
8793
}))
8894

8995
import { MothershipStreamV1ToolOutcome } from '@/lib/copilot/generated/mothership-stream-v1'
@@ -583,4 +589,102 @@ describe('runCopilotLifecycle', () => {
583589
// Final attempt (2) is terminal → not flagged, so Go bills + surfaces it.
584590
expect(bodies[3].willRetryOnStreamError).toBeUndefined()
585591
})
592+
593+
it('force-fails a hung tool promise and resumes with an error result instead of wedging', async () => {
594+
vi.useFakeTimers()
595+
try {
596+
const fetchUrls: string[] = []
597+
const bodies: Record<string, unknown>[] = []
598+
const executionContext: ExecutionContext = {
599+
userId: 'user-1',
600+
workflowId: '',
601+
workspaceId: 'ws-1',
602+
chatId: 'chat-1',
603+
decryptedEnvVars: {},
604+
}
605+
606+
// Mirror the real helper: settle the tool call into a terminal error
607+
// state so the resume loop can serialize an error result for it.
608+
mockForceFailHungToolCall.mockImplementation(
609+
async (toolCallId: string, context: StreamingContext, message: string) => {
610+
const tool = context.toolCalls.get(toolCallId)
611+
if (!tool) return
612+
tool.status = MothershipStreamV1ToolOutcome.error
613+
tool.endTime = Date.now()
614+
tool.result = { success: false }
615+
tool.error = message
616+
}
617+
)
618+
619+
// Initial leg checkpoints on an async tool whose promise NEVER settles —
620+
// the exact shape of the prod incident (claimed, marked running, hung).
621+
mockRunStreamLoop.mockImplementationOnce(
622+
async (
623+
fetchUrl: string,
624+
fetchOptions: RequestInit,
625+
context: StreamingContext
626+
): Promise<void> => {
627+
fetchUrls.push(fetchUrl)
628+
bodies.push(JSON.parse(String(fetchOptions.body)))
629+
context.toolCalls.set('tool-hung', {
630+
id: 'tool-hung',
631+
name: 'read',
632+
status: 'executing',
633+
})
634+
context.pendingToolPromises.set('tool-hung', new Promise(() => {}))
635+
context.awaitingAsyncContinuation = {
636+
checkpointId: 'ckpt-1',
637+
pendingToolCallIds: ['tool-hung'],
638+
}
639+
}
640+
)
641+
642+
// Resume leg completes normally with the error result delivered.
643+
mockRunStreamLoop.mockImplementationOnce(
644+
async (
645+
fetchUrl: string,
646+
fetchOptions: RequestInit,
647+
context: StreamingContext
648+
): Promise<void> => {
649+
fetchUrls.push(fetchUrl)
650+
bodies.push(JSON.parse(String(fetchOptions.body)))
651+
context.accumulatedContent = 'The file read failed, but here is what I know.'
652+
}
653+
)
654+
655+
const lifecycle = runCopilotLifecycle(
656+
{ message: 'hello', messageId: 'stream-1' },
657+
{
658+
userId: 'user-1',
659+
workspaceId: 'ws-1',
660+
chatId: 'chat-1',
661+
executionId: 'exec-1',
662+
runId: 'run-1',
663+
executionContext,
664+
}
665+
)
666+
667+
// Wait budget = watchdog (60s, mocked) + resume grace (30s). Advance past it.
668+
await vi.advanceTimersByTimeAsync(91_000)
669+
const result = await lifecycle
670+
671+
expect(mockForceFailHungToolCall).toHaveBeenCalledWith(
672+
'tool-hung',
673+
expect.anything(),
674+
expect.stringContaining('hung')
675+
)
676+
expect(fetchUrls[1]).toBe('http://mothership.test/api/tools/resume')
677+
expect(bodies[1].results).toEqual([
678+
expect.objectContaining({
679+
callId: 'tool-hung',
680+
name: 'read',
681+
success: false,
682+
data: { error: expect.stringContaining('hung') },
683+
}),
684+
])
685+
expect(result.success).toBe(true)
686+
} finally {
687+
vi.useRealTimers()
688+
}
689+
})
586690
})

apps/sim/lib/copilot/request/lifecycle/run.ts

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { sleep } from '@sim/utils/helpers'
55
import { generateId } from '@sim/utils/id'
66
import { isWorkspaceOnEnterprisePlan } from '@/lib/billing/core/subscription'
77
import { createRunSegment, updateRunStatus } from '@/lib/copilot/async-runs/repository'
8-
import { SIM_AGENT_VERSION } from '@/lib/copilot/constants'
8+
import { SIM_AGENT_VERSION, TOOL_WATCHDOG_RESUME_GRACE_MS } from '@/lib/copilot/constants'
99
import {
1010
MothershipStreamV1EventType,
1111
MothershipStreamV1RunKind,
@@ -24,7 +24,11 @@ import {
2424
setTerminalToolCallState,
2525
} from '@/lib/copilot/request/tool-call-state'
2626
import { handleBillingLimitResponse } from '@/lib/copilot/request/tools/billing'
27-
import { executeToolAndReport } from '@/lib/copilot/request/tools/executor'
27+
import {
28+
executeToolAndReport,
29+
forceFailHungToolCall,
30+
toolWatchdogTimeoutMs,
31+
} from '@/lib/copilot/request/tools/executor'
2832
import type { TraceCollector } from '@/lib/copilot/request/trace'
2933
import { RequestTraceV1SpanStatus } from '@/lib/copilot/request/trace'
3034
import type {
@@ -405,15 +409,48 @@ async function runCheckpointLoop(
405409
if (!continuation) break
406410

407411
if (context.pendingToolPromises.size > 0) {
412+
// Bounded by the slowest pending tool's watchdog plus grace. The
413+
// per-tool watchdog already guarantees each promise settles; this gate
414+
// is the structural backstop so that no tool failure mode — known or
415+
// unknown — can park the checkpoint loop (and the chat's pending-stream
416+
// lock) forever.
417+
const waitBudgetMs =
418+
Array.from(context.pendingToolPromises.keys()).reduce(
419+
(max, toolCallId) =>
420+
Math.max(max, toolWatchdogTimeoutMs(context.toolCalls.get(toolCallId)?.name)),
421+
0
422+
) + TOOL_WATCHDOG_RESUME_GRACE_MS
408423
const waitSpan = context.trace.startSpan('Wait for Tools', 'lifecycle.wait_tools', {
409424
checkpointId: continuation.checkpointId,
410425
pendingCount: context.pendingToolPromises.size,
426+
waitBudgetMs,
411427
})
412428
logger.info('Waiting for in-flight tool executions before resume', {
413429
checkpointId: continuation.checkpointId,
414430
pendingCount: context.pendingToolPromises.size,
431+
waitBudgetMs,
415432
})
416-
await Promise.allSettled(context.pendingToolPromises.values())
433+
const settledInTime = await Promise.race([
434+
Promise.allSettled(context.pendingToolPromises.values()).then(() => true),
435+
sleep(waitBudgetMs).then(() => false),
436+
])
437+
if (!settledInTime) {
438+
const hungToolCallIds = Array.from(context.pendingToolPromises.keys())
439+
logger.error('Pending tool executions exceeded the resume wait budget; force-failing', {
440+
checkpointId: continuation.checkpointId,
441+
waitBudgetMs,
442+
hungToolCallIds,
443+
})
444+
for (const toolCallId of hungToolCallIds) {
445+
await forceFailHungToolCall(
446+
toolCallId,
447+
context,
448+
'Tool execution hung on the Sim executor and was abandoned so the conversation could continue.'
449+
)
450+
context.pendingToolPromises.delete(toolCallId)
451+
}
452+
}
453+
waitSpan.attributes = { ...waitSpan.attributes, settledInTime }
417454
context.trace.endSpan(waitSpan)
418455
}
419456

0 commit comments

Comments
 (0)