From 21e549594cd143ebce2b9089db854b5ad116b5ed Mon Sep 17 00:00:00 2001 From: Justin Gray Date: Mon, 29 Jun 2026 12:51:12 +0000 Subject: [PATCH] fix(acp): drop untracked non-numeric responses to prevent grok handshake stall The RpcClient and raw ext requests only ever mint numeric request ids, so a response whose id is not a base-10 number cannot match anything we sent. Some agents (notably grok) emit unsolicited JSON-RPC responses with symbolic ids such as "skills-reload" right after `initialize`. Forwarding those to the RpcClient made it fail while resolving the id, tearing down the response loop so that subsequent `authenticate` and `session/new` responses were never delivered and the handshake stalled until the caller timed out. Route untracked responses through a `forwardUntrackedResponse` guard that only forwards ids matching `^[0-9]+$` and otherwise drops them with a debug log. Applies to both the exit-encoded and success/defect response paths. Adds an in-memory regression test that floods the stream with `skills-reload` responses after `initialize` and asserts the initialize/authenticate/session handshake still completes. Co-Authored-By: Claude Opus 4.8 --- .../effect-acp/src/grok-skills-reload.test.ts | 97 +++++++++++++++++++ packages/effect-acp/src/protocol.ts | 23 ++++- 2 files changed, 118 insertions(+), 2 deletions(-) create mode 100644 packages/effect-acp/src/grok-skills-reload.test.ts diff --git a/packages/effect-acp/src/grok-skills-reload.test.ts b/packages/effect-acp/src/grok-skills-reload.test.ts new file mode 100644 index 00000000000..aa261b8ea63 --- /dev/null +++ b/packages/effect-acp/src/grok-skills-reload.test.ts @@ -0,0 +1,97 @@ +import * as Effect from "effect/Effect"; +import * as Fiber from "effect/Fiber"; +import * as Queue from "effect/Queue"; +import { it, assert } from "@effect/vitest"; + +import * as AcpClient from "./client.ts"; +import { makeInMemoryStdio } from "./_internal/stdio.ts"; + +// Regression test for a production hang against the grok agent. After the +// `initialize` response, grok emits unsolicited JSON-RPC responses with the +// symbolic id `"skills-reload"`. Those ids never match a request we issued (the +// RpcClient only mints numeric ids), and forwarding them to the RpcClient made +// it tear down its response loop, so the subsequent `authenticate` and +// `session/new` responses were never delivered and the handshake stalled until +// the caller timed out. The protocol layer must drop such untracked, +// non-numeric responses instead. + +const encoder = new TextEncoder(); + +const skillsReloadResponse = `${JSON.stringify({ + jsonrpc: "2.0", + id: "skills-reload", + result: { result: { reloaded: 0 } }, +})}\n`; + +const extNotification = `${JSON.stringify({ + jsonrpc: "2.0", + method: "_x.ai/mcp/servers_updated", + params: { mcpServers: [] }, +})}\n`; + +const responseFor = (id: unknown, result: unknown) => + `${JSON.stringify({ jsonrpc: "2.0", id, result })}\n`; + +it.effect( + "completes initialize/authenticate/session despite untracked skills-reload responses", + () => + Effect.gen(function* () { + const { stdio, input, output } = yield* makeInMemoryStdio(); + const push = (line: string) => Queue.offer(input, encoder.encode(line)); + + // Fake grok peer: answer each request and, right after `initialize`, + // flood the stream with untracked `skills-reload` responses like grok. + const peer = yield* Effect.forkScoped( + Effect.forever( + Queue.take(output).pipe( + Effect.flatMap((chunk) => + Effect.forEach( + chunk.split("\n").filter((line) => line.trim().length > 0), + (line) => { + const message = JSON.parse(line) as { id?: unknown; method?: string }; + if (message.method === "initialize") { + return push(responseFor(message.id, { protocolVersion: 1 })).pipe( + Effect.andThen( + Effect.forEach([1, 2, 3, 4, 5], () => push(skillsReloadResponse), { + discard: true, + }), + ), + Effect.andThen(push(extNotification)), + ); + } + if (message.method === "authenticate") { + return push(responseFor(message.id, {})); + } + if (message.method === "session/new") { + return push(responseFor(message.id, { sessionId: "mock-session-1" })); + } + return Effect.void; + }, + { discard: true }, + ), + ), + ), + ), + ); + + const result = yield* Effect.gen(function* () { + const acp = yield* AcpClient.AcpClient; + const init = yield* acp.agent.initialize({ + protocolVersion: 1, + clientCapabilities: { + fs: { readTextFile: false, writeTextFile: false }, + terminal: false, + }, + clientInfo: { name: "t3-code-provider-probe", version: "0.0.0" }, + }); + assert.equal(init.protocolVersion, 1); + yield* acp.agent.authenticate({ methodId: "cached_token" }); + const session = yield* acp.agent.createSession({ cwd: "/tmp", mcpServers: [] }); + return session.sessionId; + }).pipe(Effect.provide(AcpClient.layer(stdio)), Effect.timeout("5 seconds")); + + assert.equal(result, "mock-session-1"); + yield* Fiber.interrupt(peer); + }).pipe(Effect.scoped), + { timeout: 15_000 }, +); diff --git a/packages/effect-acp/src/protocol.ts b/packages/effect-acp/src/protocol.ts index 27c619296c0..ace9fc11b5d 100644 --- a/packages/effect-acp/src/protocol.ts +++ b/packages/effect-acp/src/protocol.ts @@ -337,12 +337,29 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi return Queue.offer(serverQueue, message).pipe(Effect.asVoid); }; + // The RpcClient and raw ext requests only ever issue numeric request ids, so + // a response whose id is not a base-10 number cannot match anything we sent. + // Some agents (e.g. grok) emit unsolicited responses with symbolic ids such + // as `"skills-reload"`; forwarding those to the RpcClient makes it fail while + // resolving the id, which tears down the response loop and stalls every + // in-flight request. Drop untracked non-numeric responses instead. + const forwardUntrackedResponse = (requestId: string, forward: () => Effect.Effect) => { + if (/^[0-9]+$/.test(requestId)) { + return forward(); + } + return Effect.logDebug("Dropping untracked ACP response with non-numeric request id").pipe( + Effect.annotateLogs({ requestId }), + ); + }; + const handleExitEncoded = (message: RpcMessage.ResponseExitEncoded) => Ref.get(extPending).pipe( Effect.flatMap((pending) => { const pendingRequest = pending.get(message.requestId); if (!pendingRequest) { - return Queue.offer(clientQueue, message).pipe(Effect.asVoid); + return forwardUntrackedResponse(message.requestId, () => + Queue.offer(clientQueue, message).pipe(Effect.asVoid), + ); } if (message.exit._tag === "Success") { return completeExtPendingSuccess(message.requestId, message.exit.value); @@ -389,7 +406,9 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi message.requestId, ), ) - : Queue.offer(clientQueue, message).pipe(Effect.asVoid); + : forwardUntrackedResponse(message.requestId, () => + Queue.offer(clientQueue, message).pipe(Effect.asVoid), + ); }), ); case "Defect":