From 4f1e86a7f7f776c6732fe76250bb6639f32f4de9 Mon Sep 17 00:00:00 2001 From: Beforerr Date: Thu, 11 Jun 2026 18:20:36 -0700 Subject: [PATCH] fix: survive stray SIGINT races in R sessions, idempotent stop, windows-safe test cwds - R runtime: options(catch.script.errors=TRUE) so an interrupt landing outside .repld_run's tryCatch doesn't 'Execution halted' the non-interactive session; drain stray pending SIGINTs (engine resends) at eval start so they can't detonate inside the next eval - engine: persistent control-line reader + bounded status wait, so an eval aborted before handlers were set degrades to an error instead of hanging - repld stop exits 0 when daemon not running - tests: session cwds outlive tests (Windows locks live process cwd against TempDir cleanup); normalize shared cwds (macOS /var symlink, Windows 8.3) - skip TestRInterruptSurvives on windows (no SIGINT delivery) --- .github/workflows/test.yml | 5 ++- README.md | 4 +-- go/integration_test.go | 22 +++++++++++++ go/julia_integration_test.go | 6 ++-- go/main.go | 4 +++ go/python_integration_test.go | 6 ++-- go/r/runtime.R | 11 ++++++- go/r_integration_test.go | 8 +++-- go/session.go | 58 +++++++++++++++++++++++++--------- go/wolfram_integration_test.go | 2 +- 10 files changed, 98 insertions(+), 28 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4ea2fa6..b5a8f69 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,9 +14,12 @@ jobs: runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v6 + # go-version-file would pin the exact go.mod patch version, which is + # rarely in the runner toolcache and downloads a toolchain (~15-20s); + # stable matches the preinstalled one. - uses: actions/setup-go@v5 with: - go-version-file: go.mod + go-version: stable cache-dependency-path: go.sum - uses: julia-actions/setup-julia@v2 - uses: julia-actions/cache@v2 diff --git a/README.md b/README.md index be8224f..c5a9c13 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Stateful and persistent REPL-like kernel for fast incremental iteration (Act → ```bash curl -fsSL https://raw.githubusercontent.com/Beforerr/repld/main/install.sh | bash # Installs to ~/.local/bin. Override with `INSTALL_DIR` env var. -# To uninstall: `rm "$(which repld)"`. +# To uninstall: `repld stop && rm "$(which repld)"`. ``` Code is executed in long-lived sessions so state (variables, loaded packages/modules) persist between calls. @@ -45,7 +45,7 @@ The eval flags use each language's native spelling — **Julia: `-e` / `-E`**, * ```bash repld julia -e CODE | repld julia -E EXPR | repld python3 -c CODE | repld R -e CODE | repld wolframscript -c CODE -repld julia script.jl [args...] +repld julia/python3/R/wolframscript FILE [args...] repld --session LABEL ... # named session, reusable across dirs repld --fresh ... # restart targeted session first diff --git a/go/integration_test.go b/go/integration_test.go index 84632cb..0c9a03b 100644 --- a/go/integration_test.go +++ b/go/integration_test.go @@ -42,6 +42,8 @@ var ( sharedStop func() sharedJuliaDir string sharedJuliaOnce sync.Once + sessionCwdMu sync.Mutex + sessionCwds []string ) func sharedDaemon(t *testing.T) string { @@ -60,6 +62,24 @@ func stopSharedDaemon() { if sharedJuliaDir != "" { os.RemoveAll(sharedJuliaDir) } + for _, dir := range sessionCwds { + os.RemoveAll(dir) + } +} + +// sessionCwd returns a temp dir to use as a session cwd. It must outlive the +// test: sessions on the shared daemon keep running with this dir as their +// process cwd, which on Windows locks it against t.TempDir's RemoveAll cleanup. +func sessionCwd(t *testing.T) string { + t.Helper() + dir, err := os.MkdirTemp("", "repld-cwd-") + require.NoError(t, err) + dir, err = filepath.EvalSymlinks(dir) + require.NoError(t, err) + sessionCwdMu.Lock() + sessionCwds = append(sessionCwds, dir) + sessionCwdMu.Unlock() + return dir } // sharedJuliaCwd returns a stable temp dir so display/eval/trace tests that @@ -70,6 +90,8 @@ func sharedJuliaCwd(t *testing.T) string { sharedJuliaOnce.Do(func() { dir, err := os.MkdirTemp("", "repld-julia-shared-") require.NoError(t, err) + dir, err = filepath.EvalSymlinks(dir) + require.NoError(t, err) sharedJuliaDir = dir }) return sharedJuliaDir diff --git a/go/julia_integration_test.go b/go/julia_integration_test.go index 46dfa5e..af80d71 100644 --- a/go/julia_integration_test.go +++ b/go/julia_integration_test.go @@ -318,7 +318,7 @@ func TestClientDisconnectInterruptsEval(t *testing.T) { func TestRevisePicksUpPackageChanges(t *testing.T) { socketPath := sharedDaemon(t) - pkgDir := t.TempDir() + pkgDir := sessionCwd(t) srcDir := filepath.Join(pkgDir, "src") require.NoError(t, os.Mkdir(srcDir, 0755)) projectToml, err := os.ReadFile(filepath.Join("testdata", "TestRevPkg", "Project.toml")) @@ -357,7 +357,7 @@ func TestRevisePicksUpPackageChanges(t *testing.T) { func TestJuliaWorldAgeDisplay(t *testing.T) { socketPath := sharedDaemon(t) - pkgDir := t.TempDir() + pkgDir := sessionCwd(t) require.NoError(t, os.Mkdir(filepath.Join(pkgDir, "src"), 0755)) uuid := fmt.Sprintf("a1b2c3d4-0000-0000-0000-%012d", time.Now().UnixNano()%1e12) require.NoError(t, os.WriteFile(filepath.Join(pkgDir, "Project.toml"), @@ -396,7 +396,7 @@ func TestCLIJuliaMissingScriptBehavesLikeInteractiveInterpreter(t *testing.T) { // Own cwd → a cold session, so `x=1` reaches a fresh julia as a launch arg // (a warm session would eval it as code). - res := repldOK(t, socketPath, t.TempDir(), "julia", "x=1") + res := repldOK(t, socketPath, sessionCwd(t), "julia", "x=1") require.Contains(t, res.stderr, "No such file") require.NotContains(t, res.stderr, "repld: persistent REPL daemon") } diff --git a/go/main.go b/go/main.go index e138d66..ab036a9 100644 --- a/go/main.go +++ b/go/main.go @@ -445,6 +445,10 @@ func dispatchSubcommand(p parsed) { case "sessions": run(p.socket, protocolRequest{Action: "sessions"}, false) case "stop": + if !pingDaemon(p.socket) { + fmt.Println("repld daemon is not running.") + return + } run(p.socket, protocolRequest{Action: "stop"}, false) case "trace": tg := parseTarget(p) diff --git a/go/python_integration_test.go b/go/python_integration_test.go index 6fef0c9..ac8c6eb 100644 --- a/go/python_integration_test.go +++ b/go/python_integration_test.go @@ -51,7 +51,7 @@ func TestPythonSessionsAreKeyedByInterpreter(t *testing.T) { wrapper := []byte(fmt.Sprintf("#!/bin/sh\nexec %q \"$@\"\n", pythonExe)) require.NoError(t, os.WriteFile(pyA, wrapper, 0755)) require.NoError(t, os.WriteFile(pyB, wrapper, 0755)) - cwd := t.TempDir() + cwd := sessionCwd(t) send := func(exe, code string) cliResult { return repldOK(t, socketPath, cwd, exe, "-c", code) @@ -65,7 +65,7 @@ func TestPythonSessionsAreKeyedByInterpreter(t *testing.T) { func TestPythonFileEvalArgsAndState(t *testing.T) { socketPath, _ := pythonTestDaemon(t) - cwd := t.TempDir() + cwd := sessionCwd(t) script := filepath.Join(cwd, "script.py") require.NoError(t, os.WriteFile(script, []byte(`import sys print(",".join(sys.argv[1:])) @@ -93,7 +93,7 @@ func TestCLIPythonMissingScriptBehavesLikeInteractiveInterpreter(t *testing.T) { // Own cwd → a cold session, so `x=1` is passed as a launch arg to a fresh // python that fails to open it (a warm session would eval it as code). - res := repldOK(t, socketPath, t.TempDir(), "python3", "x=1") + res := repldOK(t, socketPath, sessionCwd(t), "python3", "x=1") require.Contains(t, res.stderr, "can't open file") require.NotContains(t, res.stderr, ">>>") require.NotContains(t, res.stderr, "repld: persistent REPL daemon") diff --git a/go/r/runtime.R b/go/r/runtime.R index 34a79c1..0a50fe0 100644 --- a/go/r/runtime.R +++ b/go/r/runtime.R @@ -1,3 +1,8 @@ +# Non-interactive R exits "Execution halted" on an uncaught interrupt (SIGINT +# outside .repld_run's tryCatch: pre-handler decode/parse, sentinel statement) +# unless this is set (check_session_exit, R src/main/main.c). +options(catch.script.errors = TRUE) + .repld_decode <- function(encoded) { if (!nzchar(encoded)) { return("") @@ -57,8 +62,12 @@ } .repld_run <- function(hex_code, print_result) { - code <- .repld_decode(hex_code) + # Engine resends SIGINT while an interrupt is in flight; a late one sits + # pending at the top-level read and would detonate inside this eval. Discard + # it (a real interrupt for this eval gets resent). + tryCatch(Sys.sleep(0.001), interrupt = function(cond) NULL) tryCatch({ + code <- .repld_decode(hex_code) exprs <- parse(text = code, srcfile = srcfilecopy("", code)) visible <- FALSE value <- NULL diff --git a/go/r_integration_test.go b/go/r_integration_test.go index 64d269d..1c7e007 100644 --- a/go/r_integration_test.go +++ b/go/r_integration_test.go @@ -5,6 +5,7 @@ import ( "os" "os/exec" "path/filepath" + "runtime" "strings" "testing" "time" @@ -19,7 +20,7 @@ func TestRAdapter(t *testing.T) { } socketPath := sharedDaemon(t) - cwd := t.TempDir() + cwd := sessionCwd(t) lf := func(s string) string { return strings.ReplaceAll(s, "\r\n", "\n") } res := repldOK(t, socketPath, cwd, "R", "-e", `cat(21 * 2, "\n", sep = "")`) @@ -48,7 +49,7 @@ func TestRFileEvalArgsAndState(t *testing.T) { } socketPath := sharedDaemon(t) - cwd := t.TempDir() + cwd := sessionCwd(t) lf := func(s string) string { return strings.ReplaceAll(s, "\r\n", "\n") } script := filepath.Join(cwd, "script.R") require.NoError(t, os.WriteFile(script, @@ -64,6 +65,9 @@ func TestRFileEvalArgsAndState(t *testing.T) { } func TestRInterruptSurvives(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("interruptProc cannot deliver SIGINT to detached children on windows") + } if _, err := exec.LookPath(r.Adapter{}.DefaultExe()); err != nil { t.Skipf("%s not installed", r.Adapter{}.DefaultExe()) } diff --git a/go/session.go b/go/session.go index 6a7b2ef..183fbff 100644 --- a/go/session.go +++ b/go/session.go @@ -31,6 +31,7 @@ type Session struct { stderr *bufio.Reader control *bufio.Reader controlConn net.Conn + ctrlLines chan *evalError // one parsed OK/ERR line per eval; closed on control EOF exited chan struct{} mu sync.Mutex // guards startup sem chan struct{} // capacity-1: serialises evals, ctx-cancellable acquire @@ -179,6 +180,20 @@ func (s *Session) start(exe string, workDir string) error { s.kill() return fmt.Errorf("control channel handshake did not complete") } + // Persistent reader: a per-eval reader abandoned on timeout would stay + // blocked and steal the next eval's status line. + ctrl := make(chan *evalError, 8) + s.ctrlLines = ctrl + go func() { + defer close(ctrl) + for { + line, err := s.control.ReadString('\n') + if err != nil { + return + } + ctrl <- parseControlLine(line) + } + }() return nil } @@ -328,6 +343,20 @@ func (s *Session) executeRaw(code string, onChunk func(data string, isStderr boo if expectControl && s.control == nil { return nil, fmt.Errorf("control channel unavailable") } + if expectControl { + // Discard status lines stranded by a previous eval's bounded wait. + drain: + for { + select { + case _, ok := <-s.ctrlLines: + if !ok { + break drain + } + default: + break drain + } + } + } // The JSON encoder and log writer are shared between stdout/stderr readers. var emitMu sync.Mutex @@ -366,20 +395,6 @@ func (s *Session) executeRaw(code string, onChunk func(data string, isStderr boo errCh <- scanResult{tail: tail, err: err} }() - // Read while the runtime writes; large tracebacks can otherwise deadlock - // before sentinel drain completes. - ctrlCh := make(chan *evalError, 1) - if expectControl { - go func() { - line, err := s.control.ReadString('\n') - if err != nil { - ctrlCh <- nil - return - } - ctrlCh <- parseControlLine(line) - }() - } - wait := func() (*evalError, error) { outErr := <-outCh errResult := <-errCh @@ -390,7 +405,20 @@ func (s *Session) executeRaw(code string, onChunk func(data string, isStderr boo return nil, outErr } if expectControl { - return <-ctrlCh, nil + // Status line is written before the sentinel, so it's normally in + // flight here; missing only when an interrupt aborted the eval + // pre-handler. Bound the wait (below the 3s interrupt-kill grace) + // to degrade instead of hang. + select { + case ee, ok := <-s.ctrlLines: + if !ok { + return nil, nil // control EOF: death is reported via the stdout scan + } + return ee, nil + case <-time.After(2 * time.Second): + short := "ERROR: interrupted (eval aborted before reporting status)" + return &evalError{short: short, smart: short + "\n", full: short + "\n"}, nil + } } return nil, nil } diff --git a/go/wolfram_integration_test.go b/go/wolfram_integration_test.go index a761b0d..1cc8d5e 100644 --- a/go/wolfram_integration_test.go +++ b/go/wolfram_integration_test.go @@ -17,7 +17,7 @@ func TestWolframAdapter(t *testing.T) { } socketPath := sharedDaemon(t) - cwd := t.TempDir() + cwd := sessionCwd(t) lf := func(s string) string { return strings.ReplaceAll(s, "\r\n", "\n") } res := repldOK(t, socketPath, cwd, "wolframscript", "-c", `21 * 2`)