Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export interface Size {
}
export const MAX_U16_VALUE: number;
export const MIN_U16_VALUE: number;
export declare function getSyntheticEofSequence(): Buffer;
/** Resize the terminal. */
export declare function ptyResize(fd: number, size: Size): void;
/**
Expand All @@ -66,5 +67,10 @@ export declare class Pty {
* once (it will error the second time). The caller is responsible for closing the file
* descriptor.
*/
takeFd(): c_int;
takeControllerFd(): c_int;
/**
* Closes the owned file descriptor for the PTY controller. The Nodejs side must call this
* when it is done with the file descriptor to avoid leaking FDs.
*/
dropUserFd(): void;
}
2 changes: 2 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ const {
Operation,
MAX_U16_VALUE,
MIN_U16_VALUE,
getSyntheticEofSequence,
ptyResize,
setCloseOnExec,
getCloseOnExec,
Expand All @@ -335,6 +336,7 @@ module.exports.Pty = Pty;
module.exports.Operation = Operation;
module.exports.MAX_U16_VALUE = MAX_U16_VALUE;
module.exports.MIN_U16_VALUE = MIN_U16_VALUE;
module.exports.getSyntheticEofSequence = getSyntheticEofSequence;
module.exports.ptyResize = ptyResize;
module.exports.setCloseOnExec = setCloseOnExec;
module.exports.getCloseOnExec = getCloseOnExec;
2 changes: 1 addition & 1 deletion npm/darwin-arm64/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty-darwin-arm64",
"version": "3.5.3",
"version": "3.6.0",
"os": [
"darwin"
],
Expand Down
2 changes: 1 addition & 1 deletion npm/darwin-x64/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty-darwin-x64",
"version": "3.5.3",
"version": "3.6.0",
"os": [
"darwin"
],
Expand Down
2 changes: 1 addition & 1 deletion npm/linux-x64-gnu/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty-linux-x64-gnu",
"version": "3.5.3",
"version": "3.6.0",
"os": [
"linux"
],
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty",
"version": "3.5.3",
"version": "3.6.0",
"main": "dist/wrapper.js",
"types": "dist/wrapper.d.ts",
"author": "Szymon Kaliski <hi@szymonkaliski.com>",
Expand Down Expand Up @@ -40,7 +40,7 @@
"build:wrapper": "tsup",
"prepublishOnly": "napi prepublish -t npm",
"test": "vitest run",
"test:ci": "vitest --reporter=verbose --reporter=github-actions run",
"test:ci": "vitest --reporter=verbose --reporter=github-actions --allowOnly run",
"test:hang": "vitest run --reporter=hanging-process",
"universal": "napi universal",
"version": "napi version",
Expand Down
119 changes: 57 additions & 62 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,14 @@ use std::os::fd::{FromRawFd, IntoRawFd, RawFd};
use std::os::unix::process::CommandExt;
use std::process::{Command, Stdio};
use std::thread;
use std::time::Duration;

use backoff::backoff::Backoff;
use backoff::ExponentialBackoffBuilder;
use napi::bindgen_prelude::JsFunction;
use napi::bindgen_prelude::{Buffer, JsFunction};
use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode};
use napi::Status::GenericFailure;
use napi::{self, Env};
use nix::errno::Errno;
use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag};
use nix::libc::{self, c_int, ioctl, FIONREAD, TIOCOUTQ, TIOCSCTTY, TIOCSWINSZ};
use nix::libc::{self, c_int, TIOCSCTTY, TIOCSWINSZ};
use nix::pty::{openpty, Winsize};
use nix::sys::termios::{self, SetArg};

Expand All @@ -31,6 +28,7 @@ mod sandbox;
#[allow(dead_code)]
struct Pty {
controller_fd: Option<OwnedFd>,
user_fd: Option<OwnedFd>,
/// The pid of the forked process.
pub pid: u32,
}
Expand Down Expand Up @@ -89,61 +87,15 @@ pub const MAX_U16_VALUE: u16 = u16::MAX;
#[napi]
pub const MIN_U16_VALUE: u16 = u16::MIN;

fn cast_to_napi_error(err: Errno) -> napi::Error {
napi::Error::new(GenericFailure, err)
}

// if the child process exits before the controller fd is fully read or the user fd is fully
// flushed, we might accidentally end in a case where onExit is called but js hasn't had
// the chance to fully read the controller fd
// let's wait until the controller fd is fully read before we call onExit
fn poll_pty_fds_until_read(controller_fd: RawFd, user_fd: RawFd) {
let mut backoff = ExponentialBackoffBuilder::default()
.with_initial_interval(Duration::from_millis(1))
.with_max_interval(Duration::from_millis(100))
.with_max_elapsed_time(Some(Duration::from_secs(1)))
.build();

loop {
// check both input and output queues for both FDs
let mut controller_inq: i32 = 0;
let mut controller_outq: i32 = 0;
let mut user_inq: i32 = 0;
let mut user_outq: i32 = 0;

// safe because we're passing valid file descriptors and properly sized integers
unsafe {
// check bytes waiting to be read (FIONREAD, equivalent to TIOCINQ on Linux)
if ioctl(controller_fd, FIONREAD, &mut controller_inq) == -1
|| ioctl(user_fd, FIONREAD, &mut user_inq) == -1
{
// break if we can't read
break;
}

// check bytes waiting to be written (TIOCOUTQ)
if ioctl(controller_fd, TIOCOUTQ, &mut controller_outq) == -1
|| ioctl(user_fd, TIOCOUTQ, &mut user_outq) == -1
{
// break if we can't read
break;
}
}
const SYNTHETIC_EOF: &[u8] = b"\x1B]7878\x1B\\";

// if all queues are empty, we're done
if controller_inq == 0 && controller_outq == 0 && user_inq == 0 && user_outq == 0 {
break;
}
#[napi]
pub fn get_synthetic_eof_sequence() -> Buffer {
SYNTHETIC_EOF.into()
}

// apply backoff strategy
if let Some(d) = backoff.next_backoff() {
thread::sleep(d);
continue;
} else {
// we have exhausted our attempts
break;
}
}
fn cast_to_napi_error(err: Errno) -> napi::Error {
napi::Error::new(GenericFailure, err)
}

#[napi]
Expand Down Expand Up @@ -347,9 +299,10 @@ impl Pty {
thread::spawn(move || {
let wait_result = child.wait();

// try to wait for the controller fd to be fully read
poll_pty_fds_until_read(raw_controller_fd, raw_user_fd);
drop(user_fd);
// by this point, child has closed its copy of the user_fd
// lets inject our synthetic EOF OSC into the user_fd
// its ok to ignore the result here as we have a timeout on the nodejs side to handle if this write fails
let _ = write_syn_eof_to_fd(raw_user_fd);

match wait_result {
Ok(status) => {
Expand Down Expand Up @@ -379,6 +332,7 @@ impl Pty {

Ok(Pty {
controller_fd: Some(controller_fd),
user_fd: Some(user_fd),
pid,
})
}
Expand All @@ -388,7 +342,7 @@ impl Pty {
/// descriptor.
#[napi]
#[allow(dead_code)]
pub fn take_fd(&mut self) -> Result<c_int, napi::Error> {
pub fn take_controller_fd(&mut self) -> Result<c_int, napi::Error> {
if let Some(fd) = self.controller_fd.take() {
Ok(fd.into_raw_fd())
} else {
Expand All @@ -398,6 +352,15 @@ impl Pty {
))
}
}

/// Closes the owned file descriptor for the PTY controller. The Nodejs side must call this
/// when it is done with the file descriptor to avoid leaking FDs.
#[napi]
#[allow(dead_code)]
pub fn drop_user_fd(&mut self) -> Result<(), napi::Error> {
self.user_fd.take();
Ok(())
}
}

/// Resize the terminal.
Expand Down Expand Up @@ -492,3 +455,35 @@ fn set_nonblocking(fd: i32) -> Result<(), napi::Error> {
}
Ok(())
}

fn write_syn_eof_to_fd(fd: libc::c_int) -> std::io::Result<()> {
let mut remaining = SYNTHETIC_EOF;
while !remaining.is_empty() {
match unsafe {
libc::write(
fd,
remaining.as_ptr() as *const libc::c_void,
remaining.len(),
)
} {
-1 => {
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::Interrupted {
continue;
}

return Err(err);
}
0 => {
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"write returned 0",
));
}
n => {
remaining = &remaining[n as usize..];
}
}
}
Ok(())
}
85 changes: 85 additions & 0 deletions syntheticEof.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { Transform } from 'node:stream';
import { getSyntheticEofSequence } from './index.js';

// keep in sync with lib.rs::SYNTHETIC_EOF
export const SYNTHETIC_EOF = getSyntheticEofSequence();
export const EOF_EVENT = 'synthetic-eof';

// get the longest suffix of buffer that is a prefix of SYNTHETIC_EOF
function getBufferEndPrefixLength(buffer: Buffer) {
const maxLen = Math.min(buffer.length, SYNTHETIC_EOF.length);
for (let len = maxLen; len > 0; len--) {
let match = true;
for (let i = 0; i < len; i++) {
if (buffer[buffer.length - len + i] !== SYNTHETIC_EOF[i]) {
match = false;
break;
}
}

if (match) {
return len;
}
}

return 0;
}

export class SyntheticEOFDetector extends Transform {
buffer: Buffer;

constructor(options = {}) {
super(options);
this.buffer = Buffer.alloc(0);
}

_transform(chunk: Buffer, _encoding: string, callback: () => void) {
const searchData = Buffer.concat([this.buffer, chunk]);
const eofIndex = searchData.indexOf(SYNTHETIC_EOF);

if (eofIndex !== -1) {
// found EOF - emit everything before it
if (eofIndex > 0) {
this.push(searchData.subarray(0, eofIndex));
}

this.emit(EOF_EVENT);

// emit everything after EOF (if any) and clear buffer
const afterEOF = searchData.subarray(eofIndex + SYNTHETIC_EOF.length);
if (afterEOF.length > 0) {
this.push(afterEOF);
}

this.buffer = Buffer.alloc(0);
} else {
// no EOF - buffer potential partial match at end

// get the longest suffix of buffer that is a prefix of SYNTHETIC_EOF
// and emit everything before it
// this is done for the case which the eof happened to be split across multiple chunks
const commonPrefixLen = getBufferEndPrefixLength(searchData);

if (commonPrefixLen > 0) {
const emitSize = searchData.length - commonPrefixLen;
if (emitSize > 0) {
this.push(searchData.subarray(0, emitSize));
}
this.buffer = searchData.subarray(emitSize);
} else {
this.push(searchData);
this.buffer = Buffer.alloc(0);
}
}

callback();
}

_flush(callback: () => void) {
if (this.buffer.length > 0) {
this.push(this.buffer);
}

callback();
}
}
Loading