diff --git a/packages/vm-agent/internal/acp/session_host_crash.go b/packages/vm-agent/internal/acp/session_host_crash.go index dd4215ecf..67b90875e 100644 --- a/packages/vm-agent/internal/acp/session_host_crash.go +++ b/packages/vm-agent/internal/acp/session_host_crash.go @@ -63,23 +63,27 @@ func redactAgentDiagnosticText(text string) string { return redacted } -func (h *SessionHost) beginCrashRecovery(reqID json.RawMessage, viewerID string) (string, bool) { +// beginCrashRecovery attempts to start LoadSession-based recovery after an agent crash. +// Returns (agentType, redactedStderr, recoveryStarted). agentType and stderr are returned +// even when recovery is unavailable so the caller can use them without re-acquiring locks. +func (h *SessionHost) beginCrashRecovery(reqID json.RawMessage, viewerID string) (string, string, bool) { stderr := redactAgentDiagnosticText(h.peekStderr()) h.mu.Lock() defer h.mu.Unlock() - if h.sessionID == "" || !h.agentSupportsLoadSession || h.agentType == "" { - return "", false + agentType := h.agentType + if h.sessionID == "" || !h.agentSupportsLoadSession || agentType == "" { + return agentType, stderr, false } h.crashRecoveryInProgress = true h.crashStderr = stderr - h.crashAgentType = h.agentType + h.crashAgentType = agentType h.crashPromptReqID = append(json.RawMessage(nil), reqID...) h.crashPromptViewerID = viewerID h.status = HostStarting h.statusErr = "" - return h.agentType, true + return agentType, stderr, true } type crashRecoverySnapshot struct { diff --git a/packages/vm-agent/internal/acp/session_host_prompt.go b/packages/vm-agent/internal/acp/session_host_prompt.go index c79f30a6a..a0c283106 100644 --- a/packages/vm-agent/internal/acp/session_host_prompt.go +++ b/packages/vm-agent/internal/acp/session_host_prompt.go @@ -260,7 +260,8 @@ func (h *SessionHost) finishPromptCancelled(reqID json.RawMessage, info promptSt func (h *SessionHost) finishPromptWithError(promptCtx context.Context, reqID json.RawMessage, info promptStartInfo, err error) { if isCrashPromptError(err) && !errors.Is(promptCtx.Err(), context.DeadlineExceeded) { - if agentType, ok := h.beginCrashRecovery(reqID, info.viewerID); ok { + agentType, stderr, ok := h.beginCrashRecovery(reqID, info.viewerID) + if ok { slog.Warn("ACP Prompt failed because agent disconnected; deferring to crash recovery", "error", err, "agentType", agentType) h.reportLifecycle("warn", "ACP agent crashed during prompt; attempting LoadSession recovery", map[string]interface{}{ "agentType": agentType, @@ -271,6 +272,8 @@ func (h *SessionHost) finishPromptWithError(promptCtx context.Context, reqID jso h.reportActivity("recovering") return } + h.finishUnrecoverableCrashPrompt(reqID, info, agentType, stderr, err) + return } errMsg := fmt.Sprintf("Prompt failed: %v", err) @@ -290,6 +293,31 @@ func (h *SessionHost) finishPromptWithError(promptCtx context.Context, reqID jso h.notifyPromptComplete("error", err) } +func (h *SessionHost) finishUnrecoverableCrashPrompt(reqID json.RawMessage, info promptStartInfo, agentType, stderr string, err error) { + if agentType == "" { + agentType = "unknown" + } + message := "Agent process disconnected during prompt and cannot be recovered automatically" + slog.Warn("ACP Prompt failed because agent disconnected and recovery is unavailable", + "error", err, "agentType", agentType) + h.reportLifecycle("warn", "ACP agent disconnected during prompt without LoadSession recovery", map[string]interface{}{ + "agentType": agentType, + "duration": time.Since(info.startedAt).String(), + "error": err.Error(), + }) + // promptReqID gets a defensive copy because crashRecoverySnapshot stores the slice; + // marshalJSONRPCError serialises reqID immediately so no copy needed. + h.broadcastAgentCrashReport(h.crashReport(crashRecoverySnapshot{ + stderr: stderr, + agentType: agentType, + promptReqID: append(json.RawMessage(nil), reqID...), + }, false, "LoadSession recovery is unavailable for this agent session")) + h.broadcastMessage(h.marshalJSONRPCError(reqID, -32603, message)) + h.setStatus(HostError, message) + h.broadcastAgentStatus(StatusError, agentType, message) + h.notifyPromptComplete("error", fmt.Errorf("%s: %w", message, err)) +} + func (h *SessionHost) broadcastPromptResponse(reqID json.RawMessage, resp acpsdk.PromptResponse) { result, _ := json.Marshal(resp) response := map[string]interface{}{ diff --git a/packages/vm-agent/internal/acp/session_host_test.go b/packages/vm-agent/internal/acp/session_host_test.go index ee8020a61..337275c89 100644 --- a/packages/vm-agent/internal/acp/session_host_test.go +++ b/packages/vm-agent/internal/acp/session_host_test.go @@ -811,7 +811,7 @@ func TestSessionHost_BeginCrashRecoveryRequiresLoadSession(t *testing.T) { host.agentSupportsLoadSession = false host.mu.Unlock() - if _, ok := host.beginCrashRecovery(json.RawMessage(`"req-1"`), "viewer-1"); ok { + if _, _, ok := host.beginCrashRecovery(json.RawMessage(`"req-1"`), "viewer-1"); ok { t.Fatal("beginCrashRecovery succeeded without LoadSession support") } @@ -819,7 +819,7 @@ func TestSessionHost_BeginCrashRecoveryRequiresLoadSession(t *testing.T) { host.agentSupportsLoadSession = true host.mu.Unlock() - agentType, ok := host.beginCrashRecovery(json.RawMessage(`"req-1"`), "viewer-1") + agentType, _, ok := host.beginCrashRecovery(json.RawMessage(`"req-1"`), "viewer-1") if !ok { t.Fatal("beginCrashRecovery failed with LoadSession support") } @@ -840,6 +840,124 @@ func TestSessionHost_BeginCrashRecoveryRequiresLoadSession(t *testing.T) { } } +func TestSessionHost_FinishPromptWithPeerDisconnectBeginsCrashRecovery(t *testing.T) { + t.Parallel() + + host := newTestSessionHost(t) + defer host.Stop() + + completed := make(chan string, 1) + host.config.OnPromptComplete = func(stopReason string, _ error) { + completed <- stopReason + } + host.mu.Lock() + host.agentType = "openai-codex" + host.sessionID = "acp-session-1" + host.agentSupportsLoadSession = true + host.mu.Unlock() + + host.finishPromptWithError( + context.Background(), + json.RawMessage(`"req-1"`), + promptStartInfo{startedAt: time.Now(), viewerID: "viewer-1"}, + errors.New(`{"code":-32603,"message":"Internal error","data":{"error":"peer disconnected before response"}}`), + ) + + select { + case stopReason := <-completed: + t.Fatalf("prompt completed before crash recovery: %q", stopReason) + case <-time.After(50 * time.Millisecond): + } + + host.mu.RLock() + defer host.mu.RUnlock() + if !host.crashRecoveryInProgress { + t.Fatal("crashRecoveryInProgress = false, want true") + } + if host.status != HostStarting { + t.Fatalf("status = %s, want %s", host.status, HostStarting) + } +} + +func TestSessionHost_FinishPromptWithUnrecoverablePeerDisconnectReportsActionableFailure(t *testing.T) { + t.Parallel() + + host := newTestSessionHost(t) + defer host.Stop() + + completed := make(chan error, 1) + host.config.OnPromptComplete = func(stopReason string, promptErr error) { + if stopReason != "error" { + t.Errorf("stopReason = %q, want error", stopReason) + } + completed <- promptErr + } + host.mu.Lock() + host.agentType = "openai-codex" + host.sessionID = "acp-session-1" + host.agentSupportsLoadSession = false + host.mu.Unlock() + host.stderrMu.Lock() + host.stderrBuf.WriteString("fatal: peer disconnected before response\nOPENAI_API_KEY=sk-secret1234567890") + host.stderrMu.Unlock() + + host.finishPromptWithError( + context.Background(), + json.RawMessage(`"req-1"`), + promptStartInfo{startedAt: time.Now(), viewerID: "viewer-1"}, + errors.New(`{"code":-32603,"message":"Internal error","data":{"error":"peer disconnected before response"}}`), + ) + + select { + case err := <-completed: + if err == nil { + t.Fatal("promptErr = nil, want actionable failure") + } + if !strings.Contains(err.Error(), "cannot be recovered automatically") { + t.Fatalf("promptErr = %q, want actionable recovery message", err.Error()) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for prompt completion callback") + } + + host.bufMu.RLock() + defer host.bufMu.RUnlock() + foundReport := false + foundRPCError := false + for _, msg := range host.messageBuf { + var report AgentCrashReportMessage + if err := json.Unmarshal(msg.Data, &report); err == nil && report.Type == MsgAgentCrashReport { + foundReport = true + if report.Recovered { + t.Fatal("crash report recovered = true, want false") + } + if strings.Contains(report.Stderr, "sk-secret1234567890") { + t.Fatalf("crash report leaked secret: %q", report.Stderr) + } + } + + var rpc struct { + Error struct { + Message string `json:"message"` + } `json:"error"` + } + if err := json.Unmarshal(msg.Data, &rpc); err == nil && strings.Contains(rpc.Error.Message, "cannot be recovered automatically") { + foundRPCError = true + } + } + if !foundReport { + t.Fatal("missing unrecovered crash report") + } + if !foundRPCError { + t.Fatal("missing actionable JSON-RPC error") + } + + // After an unrecoverable crash the host must be in HostError, not HostReady. + if status := host.Status(); status != HostError { + t.Fatalf("host.Status() = %s after unrecoverable crash, want %s", status, HostError) + } +} + func TestSessionHost_BroadcastAgentCrashReport(t *testing.T) { t.Parallel() diff --git a/tasks/backlog/2026-05-23-peer-disconnected-before-response.md b/tasks/active/2026-05-23-peer-disconnected-before-response.md similarity index 50% rename from tasks/backlog/2026-05-23-peer-disconnected-before-response.md rename to tasks/active/2026-05-23-peer-disconnected-before-response.md index bb5c1f490..4679115a2 100644 --- a/tasks/backlog/2026-05-23-peer-disconnected-before-response.md +++ b/tasks/active/2026-05-23-peer-disconnected-before-response.md @@ -27,15 +27,25 @@ The failure originates in the ACP layer when the agent process disconnects befor - Compare observed failure mode against the recently merged crash recovery path via `LoadSession`. - Check for stale workspace or container state that may still run pre-crash-recovery behavior. +## Research Findings + +- Production D1 (`sam-prod`) contains 24 tasks with the exact `peer disconnected before response` error between 2026-05-15 and 2026-05-22. The latest observed failure updated at 2026-05-22T07:09:41.503Z. +- The LoadSession crash-recovery feature merged in commit `34fe25e4` at 2026-05-22T12:12:36Z, about five hours after the latest observed production failure. Current main therefore has a recovery path the observed failures did not have. +- Pattern by task mode: 18 task-mode failures with no profile hint, 5 conversation-mode failures with no profile hint, and 1 conversation-mode failure using profile `01KS4XBW9QPMMBXP8EWH8EHBY2` (`AMP Tester`, `openai-codex`). +- Pattern by node: failures cluster on deleted nodes, especially `01KS4SVNP7EDWB3842HQRQKEB8` (4) and `01KRRZP2JBQAPJ2G6SWCT52Y3V` (3). All recent affected nodes queried are now `deleted`, so normal debug-package download is no longer available. A current-node debug package attempt through the API using the available MCP token returned 401; localhost VM-agent debug endpoint was not reachable from the workspace. +- The current workspace is not stale: production D1 shows workspace `01KSA6ZMP1ECGPJ7R4JE4HA63D` running on healthy node `01KSA6W131DKG0X0FS7TE0423B`, linked to this task `01KSA6VYJJDGECCCX8H6NPGGG5`, with no task error. +- Code path: `SessionHost.finishPromptWithError` classifies `peer disconnected` as a crash prompt error via `isCrashPromptError`. If `agentSupportsLoadSession` is true and an ACP session ID is available, it calls `beginCrashRecovery`, waits for `monitorProcessExit`, restarts the agent, calls ACP `LoadSession`, then notifies prompt completion with stop reason `recovered`. The VM-agent task callback maps `recovered` to `executionStep=awaiting_followup` instead of `toStatus=failed`. +- Gap found: if the crash is not recoverable because LoadSession is unavailable, the existing code fell through to generic prompt failure and surfaced the raw JSON-RPC error blob. That is still a terminal failure, but it is not actionable and gives no crash report context. + ## Implementation Checklist -- [ ] Gather production evidence for recent `peer disconnected before response` failures. -- [ ] Download and inspect at least one relevant debug package when accessible. -- [ ] Identify exact ACP code path that creates or propagates the error. -- [ ] Determine whether the process exit/disconnect path triggers crash recovery or direct failure. -- [ ] Implement targeted hardening for uncovered peer disconnect cases. -- [ ] Add focused Go tests covering the failure and recovery behavior. -- [ ] Run VM agent tests and broader project checks appropriate to touched files. +- [x] Gather production evidence for recent `peer disconnected before response` failures. +- [x] Download and inspect at least one relevant debug package when accessible. +- [x] Identify exact ACP code path that creates or propagates the error. +- [x] Determine whether the process exit/disconnect path triggers crash recovery or direct failure. +- [x] Implement targeted hardening for uncovered peer disconnect cases. +- [x] Add focused Go tests covering the failure and recovery behavior. +- [x] Run VM agent tests and broader project checks appropriate to touched files. - [ ] Run required specialist review for Go/VM agent changes. - [ ] Deploy to staging only after checking active staging deployments. - [ ] Create PR on `sam/investigate-resolve-recurring-peer-01ksa6` and do not merge.