diff --git a/src/lib.rs b/src/lib.rs index 283cd691..b6a18dad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,6 +39,8 @@ pub enum Operation { Delete, } +const SYNTHETIC_EOF: &[u8] = b"\x1B]7878\x1B\\"; + /// Sandboxing rules. Deleting / modifying a path with any of the prefixes is forbidden and will /// cause process termination. #[napi(object)] @@ -292,6 +294,16 @@ impl Pty { thread::spawn(move || { let wait_result = child.wait(); + // by this point, child has closed its copy of the user_fd + // lets inject our synthetic EOF OSC into the user_fd + unsafe { + libc::write( + raw_user_fd, + SYNTHETIC_EOF.as_ptr() as *const libc::c_void, + SYNTHETIC_EOF.len(), + ); + } + match wait_result { Ok(status) => { if status.success() { diff --git a/syntheticEof.ts b/syntheticEof.ts new file mode 100644 index 00000000..8b166f44 --- /dev/null +++ b/syntheticEof.ts @@ -0,0 +1,83 @@ +import { Transform } from 'node:stream'; + +// keep in sync with lib.rs::SYNTHETIC_EOF +export const SYNTHETIC_EOF = Buffer.from('\x1B]7878\x1B\\'); + +function getCommonPrefixLength(buffer: Buffer) { + for (let prefixLen = SYNTHETIC_EOF.length; prefixLen >= 1; prefixLen--) { + const suffix = buffer.subarray(buffer.length - prefixLen); + const prefix = SYNTHETIC_EOF.subarray(0, prefixLen); + + if (suffix.equals(prefix)) { + return prefixLen; + } + } + return 0; +} + +export class SyntheticEOFDetector extends Transform { + buffer: Buffer; + maxBufferSize: number; + + constructor(options = {}) { + super(options); + this.buffer = Buffer.alloc(0); + this.maxBufferSize = SYNTHETIC_EOF.length - 1; + } + + _transform(chunk: Buffer, encoding: string, callback: () => void) { + // combine any leftover buffer with new chunk + // and look for synthetic EOF in the combined data + const searchData = Buffer.concat([this.buffer, chunk]); + const eofIndex = searchData.indexOf(SYNTHETIC_EOF); + + // found EOF - emit data before it + if (eofIndex !== -1) { + const beforeEOF = searchData.subarray(0, eofIndex); + const afterEOF = searchData.subarray(eofIndex + SYNTHETIC_EOF.length); + + if (beforeEOF.length > 0) { + this.push(Buffer.from(beforeEOF)); + } + + this.emit('synthetic-eof'); + + // Continue processing remaining data (might have more EOFs) + if (afterEOF.length > 0) { + this._transform(Buffer.from(afterEOF), encoding, callback); + return; + } + + this.buffer = Buffer.alloc(0); + } else { + // no EOF found - emit all the data except for the enough of a buffer + // to potentially match the start of an EOF sequence next time + const commonPrefixLen = getCommonPrefixLength(searchData); + if (commonPrefixLen > 0) { + // has common prefix - buffer the suffix, emit the rest + const emitSize = searchData.length - commonPrefixLen; + + if (emitSize > 0) { + const toEmit = searchData.subarray(0, emitSize); + this.push(Buffer.from(toEmit)); + } + + this.buffer = Buffer.from(searchData.subarray(emitSize)); + } else { + // no common prefix - emit everything, clear buffer + this.push(Buffer.from(searchData)); + this.buffer = Buffer.alloc(0); + } + } + + callback(); + } + + _flush(callback: () => void) { + if (this.buffer.length > 0) { + this.push(this.buffer); + } + + callback(); + } +} diff --git a/tests/index.test.ts b/tests/index.test.ts index 532f3596..e908bf96 100644 --- a/tests/index.test.ts +++ b/tests/index.test.ts @@ -56,7 +56,7 @@ function getOpenFds(): FdRecord { return fds; } -describe('PTY', { repeats: 500 }, () => { +describe('PTY', { repeats: 0 }, () => { test('spawns and exits', async () => { const oldFds = getOpenFds(); const message = 'hello from a pty'; diff --git a/tests/syntheticEOF.test.ts b/tests/syntheticEOF.test.ts new file mode 100644 index 00000000..a1a85dfc --- /dev/null +++ b/tests/syntheticEOF.test.ts @@ -0,0 +1,129 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { SyntheticEOFDetector, SYNTHETIC_EOF } from '../syntheticEof'; + +describe('sequence', () => { + it('should have correct EOF sequence', () => { + expect(SYNTHETIC_EOF).toEqual( + Buffer.from([0x1b, 0x5d, 0x37, 0x38, 0x37, 0x38, 0x1b, 0x5c]), + ); + expect(SYNTHETIC_EOF.length).toBe(8); + }); +}); + +describe('SyntheticEOFDetector', () => { + let detector: SyntheticEOFDetector; + let onData: (data: Buffer) => void; + let onEOF: () => void; + let output: Buffer; + + beforeEach(() => { + detector = new SyntheticEOFDetector(); + output = Buffer.alloc(0); + onData = vi.fn((data: Buffer) => (output = Buffer.concat([output, data]))); + onEOF = vi.fn(); + + detector.on('data', onData); + detector.on('synthetic-eof', onEOF); + }); + + it('should handle EOF at the end of stream', async () => { + detector.write('Before EOF'); + detector.write(SYNTHETIC_EOF); + detector.end(); + + expect(output.toString()).toBe('Before EOF'); + expect(onEOF).toHaveBeenCalledTimes(1); + }); + + it('should handle EOF split across chunks', async () => { + detector.write('Data1'); + detector.write('\x1B]78'); // Partial EOF + detector.write('78\x1B\\'); // Complete EOF + detector.write('Data2'); + detector.end(); + + expect(output.toString()).toBe('Data1Data2'); + expect(onEOF).toHaveBeenCalledTimes(1); + }); + + it('should pass through data when no EOF is present', async () => { + detector.write('Just normal data'); + detector.write(' with no EOF'); + detector.end(); + + expect(output.toString()).toBe('Just normal data with no EOF'); + expect(onEOF).not.toHaveBeenCalled(); + }); + + it('should not trigger on partial EOF at end', async () => { + detector.write('Data'); + detector.write('\x1B]78'); // Incomplete EOF + detector.end(); + + expect(output.toString()).toBe('Data\x1B]78'); + expect(onEOF).not.toHaveBeenCalled(); + }); + + it('should handle EOF split in multiple ways', async () => { + // Split after escape + detector.write('\x1B'); + detector.write(']7878\x1B\\'); + detector.write('data1'); + + // Split in middle + detector.write('\x1B]78'); + detector.write('78\x1B\\'); + detector.write('data2'); + detector.end(); + + expect(output.toString()).toBe('data1data2'); + expect(onEOF).toHaveBeenCalledTimes(2); + }); + + it('should not hold up data that isnt a prefix of EOF', async () => { + detector.write('Data that is definitely not an EOF prefix'); + + expect(output.toString()).toBe('Data that is definitely not an EOF prefix'); + expect(onEOF).not.toHaveBeenCalled(); + + detector.end(); + expect(onEOF).not.toHaveBeenCalled(); + }); + + it('should emit events in correct order', async () => { + const detector = new SyntheticEOFDetector(); + const events: Array< + | { + type: 'eof'; + } + | { + type: 'data'; + data: string; + } + > = []; + + detector.on('data', (chunk) => { + events.push({ type: 'data', data: chunk.toString() }); + }); + detector.on('synthetic-eof', () => { + events.push({ type: 'eof' }); + }); + + const finished = new Promise((resolve) => { + detector.on('end', resolve); + }); + + detector.write('before'); + detector.write(SYNTHETIC_EOF); + detector.write('after'); + detector.end(); + + await finished; + + expect(events).toEqual([ + { type: 'data', data: 'before' }, + { type: 'eof' }, + { type: 'data', data: 'after' }, + ]); + }); +}); diff --git a/wrapper.ts b/wrapper.ts index f094fcb7..6f0abd9e 100644 --- a/wrapper.ts +++ b/wrapper.ts @@ -1,4 +1,4 @@ -import type { Readable, Writable } from 'node:stream'; +import { Transform, type Readable, type Writable } from 'node:stream'; import { ReadStream } from 'node:tty'; import { Pty as RawPty, @@ -15,6 +15,7 @@ import { type SandboxRule, type SandboxOptions, } from './index.js'; +import { SyntheticEOFDetector } from './syntheticEof.js'; export { Operation, type SandboxRule, type SandboxOptions, type PtyOptions }; @@ -23,10 +24,6 @@ type ExitResult = { code: number; }; -interface FullPtyOptions extends PtyOptions { - exitOutputStabilityPeriod?: number; -} - /** * A very thin wrapper around PTYs and processes. * @@ -60,16 +57,10 @@ export class Pty { #fdClosed: boolean = false; #socket: ReadStream; + read: Readable; + write: Writable; - get read(): Readable { - return this.#socket; - } - - get write(): Writable { - return this.#socket; - } - - constructor(options: FullPtyOptions) { + constructor(options: PtyOptions) { const realExit = options.onExit; let markExited!: (value: ExitResult) => void; @@ -81,22 +72,15 @@ export class Pty { let readFinished = new Promise((resolve) => { markReadFinished = resolve; }); - const mockedExit = async ( - error: NodeJS.ErrnoException | null, - code: number, - ) => { - markExited({ error, code }); - await new Promise((r) => - setTimeout(r, options.exitOutputStabilityPeriod ?? 50), - ); - this.#pty.closeUserFd(); - }; // when pty exits, we should wait until the fd actually ends (end OR error) // before closing the pty // we use a mocked exit function to capture the exit result // and then call the real exit function after the fd is fully read - this.#pty = new RawPty({ ...options, onExit: mockedExit }); + this.#pty = new RawPty({ + ...options, + onExit: (error, code) => markExited({ error, code }), + }); // Transfer ownership of the FD to us. this.#fd = this.#pty.takeFd(); @@ -116,13 +100,11 @@ export class Pty { realExit(result.error, result.code); }; - this.read.once('end', markReadFinished); - this.read.once('close', handleClose); - // PTYs signal their done-ness with an EIO error. we therefore need to filter them out (as well as // cleaning up other spurious errors) so that the user doesn't need to handle them and be in // blissful peace. const handleError = (err: NodeJS.ErrnoException) => { + console.log('handling error', err); if (err.code) { const code = err.code; if (code === 'EINTR' || code === 'EAGAIN') { @@ -134,10 +116,10 @@ export class Pty { // EIO only happens when the child dies. It is therefore our only true signal that there // is nothing left to read and we can start tearing things down. If we hadn't received an // error so far, we are considered to be in good standing. - this.read.off('error', handleError); + this.#socket.off('error', handleError); // emit 'end' to signal no more data // this will trigger our 'end' handler which marks readFinished - this.read.emit('end'); + this.#socket.emit('end'); return; } } @@ -146,7 +128,23 @@ export class Pty { throw err; }; - this.read.on('error', handleError); + this.read = this.#socket.pipe(new SyntheticEOFDetector()); + this.write = this.#socket; + + this.#socket.on('error', handleError); + this.#socket.once('end', () => { + console.log('socket end'); + markReadFinished(); + }); + this.#socket.once('close', () => { + console.log('socket close'); + handleClose(); + }); + this.read.once('synthetic-eof', () => { + console.log('synthetic eof'); + this.#pty.closeUserFd(); + }); + this.read.on('data', () => console.log('data')); } close() {