Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions packages/vm-agent/internal/acp/session_host_crash.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,27 @@ func redactAgentDiagnosticText(text string) string {
return redacted
}

func (h *SessionHost) beginCrashRecovery(reqID json.RawMessage, viewerID string) (string, bool) {
// beginCrashRecovery attempts to start LoadSession-based recovery after an agent crash.
// Returns (agentType, redactedStderr, recoveryStarted). agentType and stderr are returned
// even when recovery is unavailable so the caller can use them without re-acquiring locks.
func (h *SessionHost) beginCrashRecovery(reqID json.RawMessage, viewerID string) (string, string, bool) {
stderr := redactAgentDiagnosticText(h.peekStderr())
h.mu.Lock()
defer h.mu.Unlock()

if h.sessionID == "" || !h.agentSupportsLoadSession || h.agentType == "" {
return "", false
agentType := h.agentType
if h.sessionID == "" || !h.agentSupportsLoadSession || agentType == "" {
return agentType, stderr, false
}

h.crashRecoveryInProgress = true
h.crashStderr = stderr
h.crashAgentType = h.agentType
h.crashAgentType = agentType
h.crashPromptReqID = append(json.RawMessage(nil), reqID...)
h.crashPromptViewerID = viewerID
h.status = HostStarting
h.statusErr = ""
return h.agentType, true
return agentType, stderr, true
}

type crashRecoverySnapshot struct {
Expand Down
30 changes: 29 additions & 1 deletion packages/vm-agent/internal/acp/session_host_prompt.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ func (h *SessionHost) finishPromptCancelled(reqID json.RawMessage, info promptSt

func (h *SessionHost) finishPromptWithError(promptCtx context.Context, reqID json.RawMessage, info promptStartInfo, err error) {
if isCrashPromptError(err) && !errors.Is(promptCtx.Err(), context.DeadlineExceeded) {
if agentType, ok := h.beginCrashRecovery(reqID, info.viewerID); ok {
agentType, stderr, ok := h.beginCrashRecovery(reqID, info.viewerID)
if ok {
slog.Warn("ACP Prompt failed because agent disconnected; deferring to crash recovery", "error", err, "agentType", agentType)
h.reportLifecycle("warn", "ACP agent crashed during prompt; attempting LoadSession recovery", map[string]interface{}{
"agentType": agentType,
Expand All @@ -271,6 +272,8 @@ func (h *SessionHost) finishPromptWithError(promptCtx context.Context, reqID jso
h.reportActivity("recovering")
return
}
h.finishUnrecoverableCrashPrompt(reqID, info, agentType, stderr, err)
return
}

errMsg := fmt.Sprintf("Prompt failed: %v", err)
Expand All @@ -290,6 +293,31 @@ func (h *SessionHost) finishPromptWithError(promptCtx context.Context, reqID jso
h.notifyPromptComplete("error", err)
}

func (h *SessionHost) finishUnrecoverableCrashPrompt(reqID json.RawMessage, info promptStartInfo, agentType, stderr string, err error) {
if agentType == "" {
agentType = "unknown"
}
message := "Agent process disconnected during prompt and cannot be recovered automatically"
slog.Warn("ACP Prompt failed because agent disconnected and recovery is unavailable",
"error", err, "agentType", agentType)
h.reportLifecycle("warn", "ACP agent disconnected during prompt without LoadSession recovery", map[string]interface{}{
"agentType": agentType,
"duration": time.Since(info.startedAt).String(),
"error": err.Error(),
})
// promptReqID gets a defensive copy because crashRecoverySnapshot stores the slice;
// marshalJSONRPCError serialises reqID immediately so no copy needed.
h.broadcastAgentCrashReport(h.crashReport(crashRecoverySnapshot{
stderr: stderr,
agentType: agentType,
promptReqID: append(json.RawMessage(nil), reqID...),
}, false, "LoadSession recovery is unavailable for this agent session"))
h.broadcastMessage(h.marshalJSONRPCError(reqID, -32603, message))
h.setStatus(HostError, message)
h.broadcastAgentStatus(StatusError, agentType, message)
h.notifyPromptComplete("error", fmt.Errorf("%s: %w", message, err))
}

func (h *SessionHost) broadcastPromptResponse(reqID json.RawMessage, resp acpsdk.PromptResponse) {
result, _ := json.Marshal(resp)
response := map[string]interface{}{
Expand Down
122 changes: 120 additions & 2 deletions packages/vm-agent/internal/acp/session_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,15 +811,15 @@
host.agentSupportsLoadSession = false
host.mu.Unlock()

if _, ok := host.beginCrashRecovery(json.RawMessage(`"req-1"`), "viewer-1"); ok {
if _, _, ok := host.beginCrashRecovery(json.RawMessage(`"req-1"`), "viewer-1"); ok {
t.Fatal("beginCrashRecovery succeeded without LoadSession support")
}

host.mu.Lock()
host.agentSupportsLoadSession = true
host.mu.Unlock()

agentType, ok := host.beginCrashRecovery(json.RawMessage(`"req-1"`), "viewer-1")
agentType, _, ok := host.beginCrashRecovery(json.RawMessage(`"req-1"`), "viewer-1")
if !ok {
t.Fatal("beginCrashRecovery failed with LoadSession support")
}
Expand All @@ -840,6 +840,124 @@
}
}

func TestSessionHost_FinishPromptWithPeerDisconnectBeginsCrashRecovery(t *testing.T) {
t.Parallel()

host := newTestSessionHost(t)
defer host.Stop()

completed := make(chan string, 1)
host.config.OnPromptComplete = func(stopReason string, _ error) {
completed <- stopReason
}
host.mu.Lock()
host.agentType = "openai-codex"
host.sessionID = "acp-session-1"
host.agentSupportsLoadSession = true
host.mu.Unlock()

host.finishPromptWithError(
context.Background(),
json.RawMessage(`"req-1"`),
promptStartInfo{startedAt: time.Now(), viewerID: "viewer-1"},
errors.New(`{"code":-32603,"message":"Internal error","data":{"error":"peer disconnected before response"}}`),
)

select {
case stopReason := <-completed:
t.Fatalf("prompt completed before crash recovery: %q", stopReason)
case <-time.After(50 * time.Millisecond):
}

host.mu.RLock()
defer host.mu.RUnlock()
if !host.crashRecoveryInProgress {
t.Fatal("crashRecoveryInProgress = false, want true")
}
if host.status != HostStarting {
t.Fatalf("status = %s, want %s", host.status, HostStarting)
}
}

func TestSessionHost_FinishPromptWithUnrecoverablePeerDisconnectReportsActionableFailure(t *testing.T) {

Check failure on line 882 in packages/vm-agent/internal/acp/session_host_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 20 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=raphaeltm_simple-agent-manager&issues=AZ5WYI1p5i6orNl3LvAE&open=AZ5WYI1p5i6orNl3LvAE&pullRequest=1108
t.Parallel()

host := newTestSessionHost(t)
defer host.Stop()

completed := make(chan error, 1)
host.config.OnPromptComplete = func(stopReason string, promptErr error) {
if stopReason != "error" {
t.Errorf("stopReason = %q, want error", stopReason)
}
completed <- promptErr
}
host.mu.Lock()
host.agentType = "openai-codex"
host.sessionID = "acp-session-1"
host.agentSupportsLoadSession = false
host.mu.Unlock()
host.stderrMu.Lock()
host.stderrBuf.WriteString("fatal: peer disconnected before response\nOPENAI_API_KEY=sk-secret1234567890")
host.stderrMu.Unlock()

host.finishPromptWithError(
context.Background(),
json.RawMessage(`"req-1"`),
promptStartInfo{startedAt: time.Now(), viewerID: "viewer-1"},
errors.New(`{"code":-32603,"message":"Internal error","data":{"error":"peer disconnected before response"}}`),
)

select {
case err := <-completed:
if err == nil {
t.Fatal("promptErr = nil, want actionable failure")
}
if !strings.Contains(err.Error(), "cannot be recovered automatically") {
t.Fatalf("promptErr = %q, want actionable recovery message", err.Error())
}
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for prompt completion callback")
}

host.bufMu.RLock()
defer host.bufMu.RUnlock()
foundReport := false
foundRPCError := false
for _, msg := range host.messageBuf {
var report AgentCrashReportMessage
if err := json.Unmarshal(msg.Data, &report); err == nil && report.Type == MsgAgentCrashReport {
foundReport = true
if report.Recovered {
t.Fatal("crash report recovered = true, want false")
}
if strings.Contains(report.Stderr, "sk-secret1234567890") {
t.Fatalf("crash report leaked secret: %q", report.Stderr)
}
}

var rpc struct {
Error struct {
Message string `json:"message"`
} `json:"error"`
}
if err := json.Unmarshal(msg.Data, &rpc); err == nil && strings.Contains(rpc.Error.Message, "cannot be recovered automatically") {
foundRPCError = true
}
}
if !foundReport {
t.Fatal("missing unrecovered crash report")
}
if !foundRPCError {
t.Fatal("missing actionable JSON-RPC error")
}

// After an unrecoverable crash the host must be in HostError, not HostReady.
if status := host.Status(); status != HostError {
t.Fatalf("host.Status() = %s after unrecoverable crash, want %s", status, HostError)
}
}

func TestSessionHost_BroadcastAgentCrashReport(t *testing.T) {
t.Parallel()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,25 @@ The failure originates in the ACP layer when the agent process disconnects befor
- Compare observed failure mode against the recently merged crash recovery path via `LoadSession`.
- Check for stale workspace or container state that may still run pre-crash-recovery behavior.

## Research Findings

- Production D1 (`sam-prod`) contains 24 tasks with the exact `peer disconnected before response` error between 2026-05-15 and 2026-05-22. The latest observed failure updated at 2026-05-22T07:09:41.503Z.
- The LoadSession crash-recovery feature merged in commit `34fe25e4` at 2026-05-22T12:12:36Z, about five hours after the latest observed production failure. Current main therefore has a recovery path the observed failures did not have.
- Pattern by task mode: 18 task-mode failures with no profile hint, 5 conversation-mode failures with no profile hint, and 1 conversation-mode failure using profile `01KS4XBW9QPMMBXP8EWH8EHBY2` (`AMP Tester`, `openai-codex`).
- Pattern by node: failures cluster on deleted nodes, especially `01KS4SVNP7EDWB3842HQRQKEB8` (4) and `01KRRZP2JBQAPJ2G6SWCT52Y3V` (3). All recent affected nodes queried are now `deleted`, so normal debug-package download is no longer available. A current-node debug package attempt through the API using the available MCP token returned 401; localhost VM-agent debug endpoint was not reachable from the workspace.
- The current workspace is not stale: production D1 shows workspace `01KSA6ZMP1ECGPJ7R4JE4HA63D` running on healthy node `01KSA6W131DKG0X0FS7TE0423B`, linked to this task `01KSA6VYJJDGECCCX8H6NPGGG5`, with no task error.
- Code path: `SessionHost.finishPromptWithError` classifies `peer disconnected` as a crash prompt error via `isCrashPromptError`. If `agentSupportsLoadSession` is true and an ACP session ID is available, it calls `beginCrashRecovery`, waits for `monitorProcessExit`, restarts the agent, calls ACP `LoadSession`, then notifies prompt completion with stop reason `recovered`. The VM-agent task callback maps `recovered` to `executionStep=awaiting_followup` instead of `toStatus=failed`.
- Gap found: if the crash is not recoverable because LoadSession is unavailable, the existing code fell through to generic prompt failure and surfaced the raw JSON-RPC error blob. That is still a terminal failure, but it is not actionable and gives no crash report context.

## Implementation Checklist

- [ ] Gather production evidence for recent `peer disconnected before response` failures.
- [ ] Download and inspect at least one relevant debug package when accessible.
- [ ] Identify exact ACP code path that creates or propagates the error.
- [ ] Determine whether the process exit/disconnect path triggers crash recovery or direct failure.
- [ ] Implement targeted hardening for uncovered peer disconnect cases.
- [ ] Add focused Go tests covering the failure and recovery behavior.
- [ ] Run VM agent tests and broader project checks appropriate to touched files.
- [x] Gather production evidence for recent `peer disconnected before response` failures.
- [x] Download and inspect at least one relevant debug package when accessible.
- [x] Identify exact ACP code path that creates or propagates the error.
- [x] Determine whether the process exit/disconnect path triggers crash recovery or direct failure.
- [x] Implement targeted hardening for uncovered peer disconnect cases.
- [x] Add focused Go tests covering the failure and recovery behavior.
- [x] Run VM agent tests and broader project checks appropriate to touched files.
- [ ] Run required specialist review for Go/VM agent changes.
- [ ] Deploy to staging only after checking active staging deployments.
- [ ] Create PR on `sam/investigate-resolve-recurring-peer-01ksa6` and do not merge.
Expand Down
Loading