Skip to content

Commit 8b90eca

Browse files
authored
Fix _fetch_next_chunk for StreamProcessor due to cutting off output (#42)
1 parent beb301d commit 8b90eca

2 files changed

Lines changed: 113 additions & 1 deletion

File tree

lib/resources/abstraction/sandbox.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,30 @@ export class SandboxProcessStream {
571571
const output =
572572
typeof this.fetch_fn === "function" ? await this.fetch_fn() : "";
573573
if (output === this._last_output) return "";
574-
const newOutput = output.slice(this._last_output.length);
574+
575+
let newOutput: string;
576+
if (output.startsWith(this._last_output)) {
577+
// Happy path: server buffer is append-only
578+
newOutput = output.slice(this._last_output.length);
579+
} else {
580+
// Buffer rotation detected: find longest suffix of _last_output
581+
// that matches a prefix of output (overlap detection).
582+
// NOTE: This is a best-effort heuristic. If new data coincidentally
583+
// matches a suffix of _last_output (e.g., repeated lines), the
584+
// overlap may be falsely detected and that data silently skipped.
585+
// Without server-side sequence numbers this is unavoidable; we
586+
// prefer this over the alternative (returning the entire buffer
587+
// and producing large duplicates on every rotation).
588+
newOutput = output;
589+
for (let i = 1; i < this._last_output.length; i++) {
590+
const suffix = this._last_output.slice(i);
591+
if (output.startsWith(suffix)) {
592+
newOutput = output.slice(suffix.length);
593+
break;
594+
}
595+
}
596+
}
597+
575598
this._last_output = output;
576599
return newOutput;
577600
}

tests/sandbox-stream.test.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,93 @@ describe("SandboxProcessStream async iterator", () => {
9191

9292
expect(lines).toEqual(["only-line\n"]);
9393
});
94+
95+
test("recovers new data when server buffer rotates (drops old data from front)", async () => {
96+
// Simulate a server with a fixed-size stdout buffer of 15 chars.
97+
// Once output exceeds that, old data is dropped from the front.
98+
const BUFFER_SIZE = 15;
99+
const allLines = [
100+
"AAAA\n", // cumulative: 5 chars
101+
"BBBB\n", // cumulative: 10 chars
102+
"CCCC\n", // cumulative: 15 chars — buffer full
103+
"DDDD\n", // cumulative: 20 chars — "AAAA\n" dropped from front
104+
"EEEE\n", // cumulative: 25 chars — "BBBB\n" dropped from front
105+
];
106+
107+
// Build the sequence of server responses, trimming the front at BUFFER_SIZE
108+
let fullOutput = "";
109+
const serverResponses: string[] = [];
110+
for (const line of allLines) {
111+
fullOutput += line;
112+
if (fullOutput.length > BUFFER_SIZE) {
113+
fullOutput = fullOutput.slice(fullOutput.length - BUFFER_SIZE);
114+
}
115+
serverResponses.push(fullOutput);
116+
}
117+
// serverResponses:
118+
// [0] "AAAA\n" (5 chars)
119+
// [1] "AAAA\nBBBB\n" (10 chars)
120+
// [2] "AAAA\nBBBB\nCCCC\n" (15 chars) — buffer full
121+
// [3] "BBBB\nCCCC\nDDDD\n" (15 chars) — "AAAA\n" dropped
122+
// [4] "CCCC\nDDDD\nEEEE\n" (15 chars) — "BBBB\n" dropped
123+
124+
let fetchIndex = 0;
125+
const stream = new SandboxProcessStream(
126+
makeMockProcess(0),
127+
() => {
128+
if (fetchIndex < serverResponses.length) {
129+
return serverResponses[fetchIndex++];
130+
}
131+
return serverResponses[serverResponses.length - 1];
132+
}
133+
);
134+
135+
const lines: string[] = [];
136+
for await (const line of stream) {
137+
lines.push(line);
138+
}
139+
140+
expect(lines).toEqual([
141+
"AAAA\n",
142+
"BBBB\n",
143+
"CCCC\n",
144+
"DDDD\n",
145+
"EEEE\n",
146+
]);
147+
});
148+
149+
test("returns entire output when rotation leaves no overlap with previous buffer", async () => {
150+
// Simulate a server where the buffer is completely replaced between
151+
// two consecutive fetches — zero characters in common.
152+
const serverResponses = [
153+
"AAAA\nBBBB\n", // fetch 0: initial data
154+
"CCCC\nDDDD\n", // fetch 1: completely different content (no overlap)
155+
];
156+
157+
let fetchIndex = 0;
158+
const stream = new SandboxProcessStream(
159+
makeMockProcess(0),
160+
() => {
161+
if (fetchIndex < serverResponses.length) {
162+
return serverResponses[fetchIndex++];
163+
}
164+
return serverResponses[serverResponses.length - 1];
165+
}
166+
);
167+
168+
const lines: string[] = [];
169+
for await (const line of stream) {
170+
lines.push(line);
171+
}
172+
173+
// With no overlap the fallback returns the entire new buffer,
174+
// which may duplicate data already yielded — but duplicates are
175+
// preferable to silent data loss.
176+
expect(lines).toEqual([
177+
"AAAA\n",
178+
"BBBB\n",
179+
"CCCC\n",
180+
"DDDD\n",
181+
]);
182+
});
94183
});

0 commit comments

Comments
 (0)