Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions src/__tests__/daemon-client-progress.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import assert from 'node:assert/strict';
import { EventEmitter } from 'node:events';
import type { Socket } from 'node:net';
import { test } from 'vitest';
import type { DaemonRequest, DaemonResponse } from '../daemon/types.ts';
import { readDaemonSocketProgressResponse } from '../daemon-client-progress.ts';
import { AppError } from '../utils/errors.ts';

type MockSocket = EventEmitter & {
ended: boolean;
encoding?: string;
end: () => MockSocket;
setEncoding: (encoding: BufferEncoding) => MockSocket;
};

function createMockSocket(): MockSocket {
const socket = new EventEmitter() as MockSocket;
socket.ended = false;
socket.end = () => {
socket.ended = true;
socket.emit('close');
return socket;
};
socket.setEncoding = (encoding) => {
socket.encoding = encoding;
return socket;
};
return socket;
}

function readSocketProgressResponse(
socket: MockSocket,
req: DaemonRequest,
): Promise<DaemonResponse> {
let settled = false;
return new Promise((resolve, reject) => {
readDaemonSocketProgressResponse(socket as unknown as Socket, {
req,
isSettled: () => settled,
clearTimeout: () => {},
resolve: (response) => {
settled = true;
resolve(response);
},
reject: (error) => {
settled = true;
reject(error);
},
});
});
}

test('readDaemonSocketProgressResponse parses split progress lines before response envelopes', async () => {
const socket = createMockSocket();
const req: DaemonRequest = {
session: 'default',
command: 'test',
positionals: ['/tmp/replays'],
flags: {},
token: 'secret',
meta: { requestId: 'req-socket-progress', requestProgress: 'replay-test' },
};
let stderr = '';
const originalStderrWrite = process.stderr.write.bind(process.stderr);

try {
(process.stderr as any).write = ((chunk: unknown) => {
stderr += String(chunk);
return true;
}) as typeof process.stderr.write;

const responsePromise = readSocketProgressResponse(socket, req);
const progressLine = JSON.stringify({
type: 'progress',
event: {
type: 'replay-test',
file: '/tmp/01-login.ad',
title: 'Login flow',
status: 'fail',
index: 1,
total: 2,
attempt: 1,
maxAttempts: 2,
durationMs: 1234,
retrying: true,
message: 'first attempt failed',
},
});
const responseLine = JSON.stringify({
type: 'response',
response: { ok: true, data: { via: 'socket-progress' } },
});

socket.emit('data', progressLine.slice(0, 24));
socket.emit('data', `${progressLine.slice(24)}\n${responseLine}\n`);
socket.emit('data', '{not-json-after-settle}\n');

await assert.doesNotReject(responsePromise);
assert.deepEqual(await responsePromise, { ok: true, data: { via: 'socket-progress' } });
assert.equal(socket.encoding, 'utf8');
assert.equal(socket.ended, true);
assert.match(stderr, /FAIL "Login flow" attempt 1\/2 retrying \(1\.23s\)/);
assert.match(stderr, / first attempt failed/);
} finally {
process.stderr.write = originalStderrWrite;
}
});

test('readDaemonSocketProgressResponse rejects invalid response lines with request context', async () => {
const socket = createMockSocket();
const req: DaemonRequest = {
session: 'default',
command: 'snapshot',
positionals: [],
flags: {},
token: 'secret',
meta: { requestId: 'req-invalid-socket-progress' },
};

const responsePromise = readSocketProgressResponse(socket, req);
socket.emit('data', '{not-json}\n');

await assert.rejects(
responsePromise,
(error) =>
error instanceof AppError &&
error.code === 'COMMAND_FAILED' &&
error.message === 'Invalid daemon response' &&
error.details?.requestId === 'req-invalid-socket-progress' &&
error.details?.line === '{not-json}',
);
});
58 changes: 56 additions & 2 deletions src/daemon-client-progress.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import http from 'node:http';
import type { Socket } from 'node:net';
import { AppError } from './utils/errors.ts';
import type { DaemonRequest } from './daemon/types.ts';
import type { DaemonRequest, DaemonResponse } from './daemon/types.ts';
import type { RequestProgressEvent } from './daemon/request-progress.ts';
import { consumeTextLines } from './utils/line-stream.ts';
import { formatReplayTestProgressEvent } from './cli-test-progress.ts';
Expand All @@ -10,7 +11,7 @@ import {
shouldStreamRequestProgress,
} from './daemon/request-progress-protocol.ts';

export function writeRequestProgressEvent(event: RequestProgressEvent): void {
function writeRequestProgressEvent(event: RequestProgressEvent): void {
const line = formatReplayTestProgressEvent(event);
if (line) process.stderr.write(`${line}\n`);
}
Expand All @@ -27,6 +28,59 @@ export function shouldReadDaemonProgressStream(
);
}

export function readDaemonSocketProgressResponse(
socket: Socket,
options: {
req: DaemonRequest;
isSettled: () => boolean;
resolve: (response: DaemonResponse) => void;
reject: (error: unknown) => void;
clearTimeout: () => void;
},
): void {
const { req, isSettled, resolve, reject, clearTimeout } = options;
let buffer = '';

const rejectInvalidLine = (line: string, error: unknown) => {
clearTimeout();
reject(
new AppError(
'COMMAND_FAILED',
'Invalid daemon response',
{
requestId: req.meta?.requestId,
line,
},
error instanceof Error ? error : undefined,
),
);
};

socket.setEncoding('utf8');
socket.on('data', (chunk) => {
if (isSettled()) return;
const parsed = consumeTextLines(buffer, chunk);
buffer = parsed.buffer;
for (const line of parsed.lines) {
try {
const message = JSON.parse(line) as unknown;
if (isDaemonProgressEnvelope(message)) {
writeRequestProgressEvent(message.event);
continue;
}
const response = isDaemonResponseEnvelope(message) ? message.response : message;
clearTimeout();
resolve(response as DaemonResponse);
socket.end();
return;
} catch (error) {
rejectInvalidLine(line, error);
return;
}
}
});
}

export function readDaemonHttpProgressResponse(
res: http.IncomingMessage,
options: {
Expand Down
57 changes: 15 additions & 42 deletions src/daemon-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import os from 'node:os';
import path from 'node:path';
import { sleep } from './utils/timeouts.ts';
import { AppError, toAppErrorCode } from './utils/errors.ts';
import { consumeTextLines } from './utils/line-stream.ts';
import { readNodeHttpResponseBody } from './utils/node-http.ts';
import type {
DaemonRequest as SharedDaemonRequest,
Expand All @@ -29,13 +28,9 @@ import { PUBLIC_COMMANDS } from './command-catalog.ts';
import { shellQuote } from './utils/shell-quote.ts';
import {
readDaemonHttpProgressResponse,
readDaemonSocketProgressResponse,
shouldReadDaemonProgressStream,
writeRequestProgressEvent,
} from './daemon-client-progress.ts';
import {
isDaemonProgressEnvelope,
isDaemonResponseEnvelope,
} from './daemon/request-progress-protocol.ts';
import { materializeRemoteArtifacts, prepareRemoteRequestArtifacts } from './daemon-artifacts.ts';
export { computeDaemonCodeSignature } from './daemon/code-signature.ts';
export { downloadRemoteArtifact } from './daemon-artifacts.ts';
Expand Down Expand Up @@ -1043,42 +1038,20 @@ async function sendSocketRequest(
}, timeoutMs)
: undefined;

let buffer = '';
socket.setEncoding('utf8');
socket.on('data', (chunk) => {
if (settled) return;
const parsed = consumeTextLines(buffer, chunk);
buffer = parsed.buffer;
for (const line of parsed.lines) {
try {
const message = JSON.parse(line) as unknown;
if (isDaemonProgressEnvelope(message)) {
writeRequestProgressEvent(message.event);
continue;
}
const response = isDaemonResponseEnvelope(message) ? message.response : message;
settled = true;
socket.end();
if (timeoutHandle) clearTimeout(timeoutHandle);
resolve(response as DaemonResponse);
return;
} catch (err) {
settled = true;
if (timeoutHandle) clearTimeout(timeoutHandle);
reject(
new AppError(
'COMMAND_FAILED',
'Invalid daemon response',
{
requestId: req.meta?.requestId,
line,
},
err instanceof Error ? err : undefined,
),
);
return;
}
}
readDaemonSocketProgressResponse(socket, {
req,
isSettled: () => settled,
clearTimeout: () => {
if (timeoutHandle) clearTimeout(timeoutHandle);
},
resolve: (response) => {
settled = true;
resolve(response);
},
reject: (error) => {
settled = true;
reject(error);
},
});

socket.on('error', (err) => {
Expand Down
Loading