Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions packages/effect-acp/src/grok-skills-reload.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import * as Effect from "effect/Effect";
import * as Fiber from "effect/Fiber";
import * as Queue from "effect/Queue";
import { it, assert } from "@effect/vitest";

import * as AcpClient from "./client.ts";
import { makeInMemoryStdio } from "./_internal/stdio.ts";

// Regression test for a production hang against the grok agent. After the
// `initialize` response, grok emits unsolicited JSON-RPC responses with the
// symbolic id `"skills-reload"`. Those ids never match a request we issued (the
// RpcClient only mints numeric ids), and forwarding them to the RpcClient made
// it tear down its response loop, so the subsequent `authenticate` and
// `session/new` responses were never delivered and the handshake stalled until
// the caller timed out. The protocol layer must drop such untracked,
// non-numeric responses instead.

const encoder = new TextEncoder();

const skillsReloadResponse = `${JSON.stringify({
jsonrpc: "2.0",
id: "skills-reload",
result: { result: { reloaded: 0 } },
})}\n`;

const extNotification = `${JSON.stringify({
jsonrpc: "2.0",
method: "_x.ai/mcp/servers_updated",
params: { mcpServers: [] },
})}\n`;

const responseFor = (id: unknown, result: unknown) =>
`${JSON.stringify({ jsonrpc: "2.0", id, result })}\n`;

it.effect(
"completes initialize/authenticate/session despite untracked skills-reload responses",
() =>
Effect.gen(function* () {
const { stdio, input, output } = yield* makeInMemoryStdio();
const push = (line: string) => Queue.offer(input, encoder.encode(line));

// Fake grok peer: answer each request and, right after `initialize`,
// flood the stream with untracked `skills-reload` responses like grok.
const peer = yield* Effect.forkScoped(
Effect.forever(
Queue.take(output).pipe(
Effect.flatMap((chunk) =>
Effect.forEach(
chunk.split("\n").filter((line) => line.trim().length > 0),
(line) => {
const message = JSON.parse(line) as { id?: unknown; method?: string };
if (message.method === "initialize") {
return push(responseFor(message.id, { protocolVersion: 1 })).pipe(
Effect.andThen(
Effect.forEach([1, 2, 3, 4, 5], () => push(skillsReloadResponse), {
discard: true,
}),
),
Effect.andThen(push(extNotification)),
);
}
if (message.method === "authenticate") {
return push(responseFor(message.id, {}));
}
if (message.method === "session/new") {
return push(responseFor(message.id, { sessionId: "mock-session-1" }));
}
return Effect.void;
},
{ discard: true },
),
),
),
),
);

const result = yield* Effect.gen(function* () {
const acp = yield* AcpClient.AcpClient;
const init = yield* acp.agent.initialize({
protocolVersion: 1,
clientCapabilities: {
fs: { readTextFile: false, writeTextFile: false },
terminal: false,
},
clientInfo: { name: "t3-code-provider-probe", version: "0.0.0" },
});
assert.equal(init.protocolVersion, 1);
yield* acp.agent.authenticate({ methodId: "cached_token" });
const session = yield* acp.agent.createSession({ cwd: "/tmp", mcpServers: [] });
return session.sessionId;
}).pipe(Effect.provide(AcpClient.layer(stdio)), Effect.timeout("5 seconds"));

assert.equal(result, "mock-session-1");
yield* Fiber.interrupt(peer);
}).pipe(Effect.scoped),
{ timeout: 15_000 },
);
23 changes: 21 additions & 2 deletions packages/effect-acp/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,12 +337,29 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi
return Queue.offer(serverQueue, message).pipe(Effect.asVoid);
};

// The RpcClient and raw ext requests only ever issue numeric request ids, so
// a response whose id is not a base-10 number cannot match anything we sent.
// Some agents (e.g. grok) emit unsolicited responses with symbolic ids such
// as `"skills-reload"`; forwarding those to the RpcClient makes it fail while
// resolving the id, which tears down the response loop and stalls every
// in-flight request. Drop untracked non-numeric responses instead.
const forwardUntrackedResponse = (requestId: string, forward: () => Effect.Effect<void>) => {
if (/^[0-9]+$/.test(requestId)) {
return forward();
}
return Effect.logDebug("Dropping untracked ACP response with non-numeric request id").pipe(
Effect.annotateLogs({ requestId }),
);
};

const handleExitEncoded = (message: RpcMessage.ResponseExitEncoded) =>
Ref.get(extPending).pipe(
Effect.flatMap((pending) => {
const pendingRequest = pending.get(message.requestId);
if (!pendingRequest) {
return Queue.offer(clientQueue, message).pipe(Effect.asVoid);
return forwardUntrackedResponse(message.requestId, () =>
Queue.offer(clientQueue, message).pipe(Effect.asVoid),
);
}
if (message.exit._tag === "Success") {
return completeExtPendingSuccess(message.requestId, message.exit.value);
Expand Down Expand Up @@ -389,7 +406,9 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi
message.requestId,
),
)
: Queue.offer(clientQueue, message).pipe(Effect.asVoid);
: forwardUntrackedResponse(message.requestId, () =>
Queue.offer(clientQueue, message).pipe(Effect.asVoid),
);
}),
);
case "Defect":
Expand Down
Loading