From 33ce2a77c01d45a3ffc48659f7b0ccbf41dabd46 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Mon, 30 Mar 2026 06:42:38 -0400 Subject: [PATCH 01/14] feat(tui): add SSE event stream subscription module Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/roborev/tui/sse.go | 109 ++++++++++++++++++++++++++++++++ cmd/roborev/tui/sse_test.go | 120 ++++++++++++++++++++++++++++++++++++ 2 files changed, 229 insertions(+) create mode 100644 cmd/roborev/tui/sse.go create mode 100644 cmd/roborev/tui/sse_test.go diff --git a/cmd/roborev/tui/sse.go b/cmd/roborev/tui/sse.go new file mode 100644 index 00000000..8abc0ab1 --- /dev/null +++ b/cmd/roborev/tui/sse.go @@ -0,0 +1,109 @@ +package tui + +import ( + "context" + "encoding/json" + "net/http" + "time" + + tea "github.com/charmbracelet/bubbletea" + "github.com/roborev-dev/roborev/internal/daemon" +) + +// sseEventMsg signals that the daemon broadcast an event. +// The TUI uses this to trigger an immediate data refresh. +type sseEventMsg struct{} + +// startSSESubscription maintains a persistent NDJSON connection to the +// daemon's /api/stream/events endpoint. On each received event it sends +// a non-blocking signal to sseCh. The goroutine reconnects with +// exponential backoff on errors and exits when stopCh is closed. +func startSSESubscription( + endpoint daemon.DaemonEndpoint, + sseCh chan<- struct{}, + stopCh <-chan struct{}, +) { + const maxBackoff = 30 * time.Second + backoff := time.Second + + for { + err := sseReadLoop(endpoint, sseCh, stopCh) + if err == nil { + return + } + + select { + case <-stopCh: + return + case <-time.After(backoff): + } + + if backoff < maxBackoff { + backoff *= 2 + } + } +} + +// sseReadLoop connects to the event stream and reads NDJSON lines until +// the connection drops or stopCh fires. Returns nil when stopCh is +// closed, non-nil error on connection/decode failure. +func sseReadLoop( + endpoint daemon.DaemonEndpoint, + sseCh chan<- struct{}, + stopCh <-chan struct{}, +) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + select { + case <-stopCh: + cancel() + case <-ctx.Done(): + } + }() + + client := endpoint.HTTPClient(0) + req, err := http.NewRequestWithContext( + ctx, http.MethodGet, + endpoint.BaseURL()+"/api/stream/events", nil, + ) + if err != nil { + return err + } + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + decoder := json.NewDecoder(resp.Body) + for { + var event daemon.Event + if err := decoder.Decode(&event); err != nil { + select { + case <-stopCh: + return nil + default: + return err + } + } + + select { + case sseCh <- struct{}{}: + default: + } + } +} + +// waitForSSE returns a tea.Cmd that blocks until a signal arrives on +// sseCh, then delivers an sseEventMsg to the Bubbletea event loop. +func waitForSSE(sseCh <-chan struct{}) tea.Cmd { + return func() tea.Msg { + _, ok := <-sseCh + if !ok { + return nil + } + return sseEventMsg{} + } +} diff --git a/cmd/roborev/tui/sse_test.go b/cmd/roborev/tui/sse_test.go new file mode 100644 index 00000000..4828dcb0 --- /dev/null +++ b/cmd/roborev/tui/sse_test.go @@ -0,0 +1,120 @@ +package tui + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/roborev-dev/roborev/internal/daemon" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSSESubscription_ReceivesEvents(t *testing.T) { + eventWritten := make(chan struct{}) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "/api/stream/events", r.URL.Path) + + w.Header().Set("Content-Type", "application/x-ndjson") + json.NewEncoder(w).Encode(daemon.Event{Type: "review.closed", JobID: 42}) + w.(http.Flusher).Flush() + close(eventWritten) + + <-r.Context().Done() + })) + defer ts.Close() + + ep := testEndpointFromURL(ts.URL) + sseCh := make(chan struct{}, 1) + stopCh := make(chan struct{}) + defer close(stopCh) + + go startSSESubscription(ep, sseCh, stopCh) + + select { + case <-sseCh: + // Got the signal + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for SSE event signal") + } +} + +func TestSSESubscription_StopsOnStopChannel(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/x-ndjson") + w.(http.Flusher).Flush() + <-r.Context().Done() + })) + defer ts.Close() + + ep := testEndpointFromURL(ts.URL) + sseCh := make(chan struct{}, 1) + stopCh := make(chan struct{}) + + done := make(chan struct{}) + go func() { + startSSESubscription(ep, sseCh, stopCh) + close(done) + }() + + close(stopCh) + + select { + case <-done: + // Goroutine exited + case <-time.After(2 * time.Second): + t.Fatal("SSE goroutine did not exit after stop signal") + } +} + +func TestSSESubscription_ReconnectsOnError(t *testing.T) { + var attempt int + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attempt++ + if attempt == 1 { + return + } + w.Header().Set("Content-Type", "application/x-ndjson") + json.NewEncoder(w).Encode(daemon.Event{Type: "review.closed", JobID: 1}) + w.(http.Flusher).Flush() + <-r.Context().Done() + })) + defer ts.Close() + + ep := testEndpointFromURL(ts.URL) + sseCh := make(chan struct{}, 1) + stopCh := make(chan struct{}) + defer close(stopCh) + + go startSSESubscription(ep, sseCh, stopCh) + + select { + case <-sseCh: + require.GreaterOrEqual(t, attempt, 2, "should have reconnected") + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for reconnected SSE event") + } +} + +func TestWaitForSSE_ReturnsOnSignal(t *testing.T) { + ch := make(chan struct{}, 1) + ch <- struct{}{} + + cmd := waitForSSE(ch) + msg := cmd() + + _, ok := msg.(sseEventMsg) + assert.True(t, ok, "expected sseEventMsg, got %T", msg) +} + +func TestWaitForSSE_ReturnsNilOnClosedChannel(t *testing.T) { + ch := make(chan struct{}, 1) + close(ch) + + cmd := waitForSSE(ch) + msg := cmd() + + assert.Nil(t, msg, "expected nil on closed channel") +} From 013569cc5111ce90c3918459b9267f9f3af74eff Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Mon, 30 Mar 2026 06:44:05 -0400 Subject: [PATCH 02/14] feat(daemon): broadcast review.closed event on close Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/daemon/server.go | 6 +++++ internal/daemon/server_actions_test.go | 33 ++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/internal/daemon/server.go b/internal/daemon/server.go index f1d71ca8..98dccb72 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -2093,6 +2093,12 @@ func (s *Server) handleCloseReview(w http.ResponseWriter, r *http.Request) { return } + s.broadcaster.Broadcast(Event{ + Type: "review.closed", + TS: time.Now(), + JobID: req.JobID, + }) + writeJSON(w, map[string]any{"success": true}) } diff --git a/internal/daemon/server_actions_test.go b/internal/daemon/server_actions_test.go index c48e6adb..c615d25d 100644 --- a/internal/daemon/server_actions_test.go +++ b/internal/daemon/server_actions_test.go @@ -632,6 +632,39 @@ func TestHandleAddCommentWithoutReview(t *testing.T) { } } +func TestHandleCloseReview_BroadcastsEvent(t *testing.T) { + assert := assert.New(t) + server, db, tmpDir := newTestServer(t) + + // Create a completed job (which creates a review) + job := createTestJob(t, db, tmpDir, "abc123", "test") + claimed, err := db.ClaimJob("worker-1") + require.NoError(t, err) + require.Equal(t, job.ID, claimed.ID) + require.NoError(t, db.CompleteJob(job.ID, "test", "prompt", "output")) + + // Subscribe to broadcaster before the close call + _, eventCh := server.broadcaster.Subscribe("") + + req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/review/close", CloseReviewRequest{ + JobID: job.ID, + Closed: true, + }) + w := httptest.NewRecorder() + server.handleCloseReview(w, req) + + assert.Equal(http.StatusOK, w.Code) + + // Verify event was broadcast + select { + case event := <-eventCh: + assert.Equal("review.closed", event.Type) + assert.Equal(job.ID, event.JobID) + case <-time.After(time.Second): + t.Fatal("timed out waiting for review.closed event") + } +} + func TestHandleListCommentsJobIDParsing(t *testing.T) { server, _, _ := newTestServer(t) testInvalidIDParsing(t, server.handleListComments, "/api/comments?job_id=%s") From fd9101e1ba3175d400518a2de00a67f4152344da Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Mon, 30 Mar 2026 06:49:25 -0400 Subject: [PATCH 03/14] feat(tui): wire SSE subscription into model lifecycle Subscribe to daemon NDJSON event stream for instant refresh when reviews are closed externally. Falls back to existing polling. Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/roborev/tui/handlers_msg.go | 27 +++++++++++++++++++++++++++ cmd/roborev/tui/tui.go | 28 +++++++++++++++++++++++++--- cmd/roborev/tui/tui_test.go | 27 +++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 3 deletions(-) diff --git a/cmd/roborev/tui/handlers_msg.go b/cmd/roborev/tui/handlers_msg.go index 63bbd436..a4888a64 100644 --- a/cmd/roborev/tui/handlers_msg.go +++ b/cmd/roborev/tui/handlers_msg.go @@ -707,12 +707,36 @@ func (m model) handleSavePatchResultMsg(msg savePatchResultMsg) (tea.Model, tea. return m, nil } +// handleSSEEventMsg processes real-time events from the daemon's NDJSON stream. +// Triggers an immediate data refresh and re-subscribes for the next event. +func (m model) handleSSEEventMsg() (tea.Model, tea.Cmd) { + if m.loadingMore || m.loadingJobs { + return m, waitForSSE(m.sseCh) + } + cmds := []tea.Cmd{ + m.fetchJobs(), + m.fetchStatus(), + waitForSSE(m.sseCh), + } + if m.tasksWorkflowEnabled() && (m.currentView == viewTasks || m.hasActiveFixJobs()) { + cmds = append(cmds, m.fetchFixJobs()) + } + return m, tea.Batch(cmds...) +} + // handleReconnectMsg processes daemon reconnection attempts. func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) { m.reconnecting = false if msg.err == nil && msg.endpoint != m.endpoint { m.endpoint = msg.endpoint m.client = msg.endpoint.HTTPClient(10 * time.Second) + // Restart SSE subscription with the new endpoint. + if m.sseStop != nil { + close(m.sseStop) + m.sseCh = make(chan struct{}, 1) + m.sseStop = make(chan struct{}) + go startSSESubscription(m.endpoint, m.sseCh, m.sseStop) + } m.consecutiveErrors = 0 m.err = nil if msg.version != "" { @@ -739,6 +763,9 @@ func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) { if cmd := m.fetchUnloadedBranches(); cmd != nil { cmds = append(cmds, cmd) } + if m.sseCh != nil { + cmds = append(cmds, waitForSSE(m.sseCh)) + } return m, tea.Batch(cmds...) } return m, nil diff --git a/cmd/roborev/tui/tui.go b/cmd/roborev/tui/tui.go index e4e10e19..a47515e2 100644 --- a/cmd/roborev/tui/tui.go +++ b/cmd/roborev/tui/tui.go @@ -381,6 +381,8 @@ type model struct { noQuit bool // Suppress keyboard quit (for managed TUI instances) controlSocket string // Socket path for runtime metadata updates (empty if disabled) ready chan struct{} // Closed on first Update; signals event loop is running + sseCh chan struct{} // Signals from SSE goroutine; nil when external IO disabled + sseStop chan struct{} // Close to stop SSE goroutine; nil when external IO disabled // Review view navigation reviewFromView viewKind // View to return to when exiting review (queue or tasks) @@ -494,6 +496,14 @@ func newModel(ep daemon.DaemonEndpoint, opts ...option) model { } } + var sseCh chan struct{} + var sseStop chan struct{} + if !opt.disableExternalIO { + sseCh = make(chan struct{}, 1) + sseStop = make(chan struct{}) + go startSSESubscription(ep, sseCh, sseStop) + } + // Test overrides for auto-filter simulation if opt.autoFilterRepo { autoFilterRepo = true @@ -573,6 +583,8 @@ func newModel(ep daemon.DaemonEndpoint, opts ...option) model { mouseEnabled: mouseEnabled, noQuit: opt.noQuit, ready: make(chan struct{}), + sseCh: sseCh, + sseStop: sseStop, colBordersOn: columnBorders, hiddenColumns: hiddenCols, columnOrder: colOrder, @@ -583,15 +595,19 @@ func newModel(ep daemon.DaemonEndpoint, opts ...option) model { } func (m model) Init() tea.Cmd { - return tea.Batch( - tea.WindowSize(), // request initial window size + cmds := []tea.Cmd{ + tea.WindowSize(), m.displayTick(), m.tick(), m.fetchJobs(), m.fetchStatus(), m.fetchRepoNames(), m.checkForUpdate(), - ) + } + if m.sseCh != nil { + cmds = append(cmds, waitForSSE(m.sseCh)) + } + return tea.Batch(cmds...) } func (m model) tasksWorkflowEnabled() bool { @@ -780,6 +796,8 @@ func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { result, cmd = m.handleErrMsg(msg) case reconnectMsg: result, cmd = m.handleReconnectMsg(msg) + case sseEventMsg: + result, cmd = m.handleSSEEventMsg() case fixJobsMsg: result, cmd = m.handleFixJobsMsg(msg) case fixTriggerResultMsg: @@ -988,6 +1006,10 @@ func Run(cfg Config) error { } _, err := p.Run() + // Stop SSE subscription goroutine. + if m.sseStop != nil { + close(m.sseStop) + } close(programDone) <-cleanupDone return err diff --git a/cmd/roborev/tui/tui_test.go b/cmd/roborev/tui/tui_test.go index a1f0a9a4..ec4c1c8c 100644 --- a/cmd/roborev/tui/tui_test.go +++ b/cmd/roborev/tui/tui_test.go @@ -2138,3 +2138,30 @@ func TestNewTuiModelOptions(t *testing.T) { }) } } + +func TestSSEEventTriggersRefresh(t *testing.T) { + m := newModel(localhostEndpoint, withExternalIODisabled()) + m.sseCh = make(chan struct{}, 1) + m.loadingJobs = false + + m, cmd := updateModel(t, m, sseEventMsg{}) + + assert.NotNil(t, cmd, "expected commands from SSE event") +} + +func TestSSEEventSkipsRefreshWhileLoading(t *testing.T) { + m := newModel(localhostEndpoint, withExternalIODisabled()) + m.sseCh = make(chan struct{}, 1) + m.loadingJobs = true + + m, cmd := updateModel(t, m, sseEventMsg{}) + + assert.NotNil(t, cmd, "expected re-subscribe command even while loading") +} + +func TestSSEDisabledInTestMode(t *testing.T) { + m := newModel(localhostEndpoint, withExternalIODisabled()) + + assert.Nil(t, m.sseCh, "sseCh should be nil when external IO is disabled") + assert.Nil(t, m.sseStop, "sseStop should be nil when external IO is disabled") +} From 472b5e69bef28104ac6c69bd01c27e763062774d Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Mon, 30 Mar 2026 07:08:50 -0400 Subject: [PATCH 04/14] fix(tui): address review findings in SSE subscription - Close old sseCh on reconnect to let orphaned waitForSSE exit cleanly - Reset backoff after successful connection (track connected state) - Use atomic.Int32 for thread-safe test counter - Remove unused eventWritten channel from test Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/roborev/tui/handlers_msg.go | 5 ++++- cmd/roborev/tui/sse.go | 24 ++++++++++++++++-------- cmd/roborev/tui/sse_test.go | 11 +++++------ 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/cmd/roborev/tui/handlers_msg.go b/cmd/roborev/tui/handlers_msg.go index a4888a64..510bcd27 100644 --- a/cmd/roborev/tui/handlers_msg.go +++ b/cmd/roborev/tui/handlers_msg.go @@ -730,9 +730,12 @@ func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) { if msg.err == nil && msg.endpoint != m.endpoint { m.endpoint = msg.endpoint m.client = msg.endpoint.HTTPClient(10 * time.Second) - // Restart SSE subscription with the new endpoint. + // Restart SSE subscription with the new endpoint. Closing + // sseCh unblocks any in-flight waitForSSE, which returns nil + // (harmless) — the new waitForSSE below takes over. if m.sseStop != nil { close(m.sseStop) + close(m.sseCh) m.sseCh = make(chan struct{}, 1) m.sseStop = make(chan struct{}) go startSSESubscription(m.endpoint, m.sseCh, m.sseStop) diff --git a/cmd/roborev/tui/sse.go b/cmd/roborev/tui/sse.go index 8abc0ab1..7d2fd9b5 100644 --- a/cmd/roborev/tui/sse.go +++ b/cmd/roborev/tui/sse.go @@ -27,11 +27,17 @@ func startSSESubscription( backoff := time.Second for { - err := sseReadLoop(endpoint, sseCh, stopCh) + connected, err := sseReadLoop(endpoint, sseCh, stopCh) if err == nil { return } + // Reset backoff after a connection that successfully read events, + // since the next failure is likely a fresh problem (daemon restart). + if connected { + backoff = time.Second + } + select { case <-stopCh: return @@ -45,13 +51,14 @@ func startSSESubscription( } // sseReadLoop connects to the event stream and reads NDJSON lines until -// the connection drops or stopCh fires. Returns nil when stopCh is -// closed, non-nil error on connection/decode failure. +// the connection drops or stopCh fires. Returns (false, nil) when stopCh +// is closed, (connected, err) on connection/decode failure. connected is +// true if at least one event was successfully read. func sseReadLoop( endpoint daemon.DaemonEndpoint, sseCh chan<- struct{}, stopCh <-chan struct{}, -) error { +) (connected bool, err error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -69,11 +76,11 @@ func sseReadLoop( endpoint.BaseURL()+"/api/stream/events", nil, ) if err != nil { - return err + return false, err } resp, err := client.Do(req) if err != nil { - return err + return false, err } defer resp.Body.Close() @@ -83,11 +90,12 @@ func sseReadLoop( if err := decoder.Decode(&event); err != nil { select { case <-stopCh: - return nil + return connected, nil default: - return err + return connected, err } } + connected = true select { case sseCh <- struct{}{}: diff --git a/cmd/roborev/tui/sse_test.go b/cmd/roborev/tui/sse_test.go index 4828dcb0..e3d25f91 100644 --- a/cmd/roborev/tui/sse_test.go +++ b/cmd/roborev/tui/sse_test.go @@ -4,6 +4,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "sync/atomic" "testing" "time" @@ -13,14 +14,12 @@ import ( ) func TestSSESubscription_ReceivesEvents(t *testing.T) { - eventWritten := make(chan struct{}) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { assert.Equal(t, "/api/stream/events", r.URL.Path) w.Header().Set("Content-Type", "application/x-ndjson") json.NewEncoder(w).Encode(daemon.Event{Type: "review.closed", JobID: 42}) w.(http.Flusher).Flush() - close(eventWritten) <-r.Context().Done() })) @@ -70,10 +69,10 @@ func TestSSESubscription_StopsOnStopChannel(t *testing.T) { } func TestSSESubscription_ReconnectsOnError(t *testing.T) { - var attempt int + var attempt atomic.Int32 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - attempt++ - if attempt == 1 { + n := attempt.Add(1) + if n == 1 { return } w.Header().Set("Content-Type", "application/x-ndjson") @@ -92,7 +91,7 @@ func TestSSESubscription_ReconnectsOnError(t *testing.T) { select { case <-sseCh: - require.GreaterOrEqual(t, attempt, 2, "should have reconnected") + require.GreaterOrEqual(t, int(attempt.Load()), 2, "should have reconnected") case <-time.After(5 * time.Second): t.Fatal("timed out waiting for reconnected SSE event") } From 1b7fc6474b366628f1f9b78c6f55a7b162fbbf5e Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Mon, 30 Mar 2026 07:27:16 -0400 Subject: [PATCH 05/14] fix(tui): prevent panic on SSE channel close during reconnect - waitForSSE now selects on both sseCh and sseStop, so closing sseStop cleanly unblocks waiters without closing the data channel (which would race with the producer goroutine) - Run() uses the final model from p.Run() for cleanup, avoiding double-close when reconnect already closed the original sseStop Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/roborev/tui/handlers_msg.go | 11 +++++------ cmd/roborev/tui/sse.go | 13 ++++++++----- cmd/roborev/tui/sse_test.go | 12 +++++++----- cmd/roborev/tui/tui.go | 12 +++++++----- 4 files changed, 27 insertions(+), 21 deletions(-) diff --git a/cmd/roborev/tui/handlers_msg.go b/cmd/roborev/tui/handlers_msg.go index 510bcd27..11bc86d1 100644 --- a/cmd/roborev/tui/handlers_msg.go +++ b/cmd/roborev/tui/handlers_msg.go @@ -711,12 +711,12 @@ func (m model) handleSavePatchResultMsg(msg savePatchResultMsg) (tea.Model, tea. // Triggers an immediate data refresh and re-subscribes for the next event. func (m model) handleSSEEventMsg() (tea.Model, tea.Cmd) { if m.loadingMore || m.loadingJobs { - return m, waitForSSE(m.sseCh) + return m, waitForSSE(m.sseCh, m.sseStop) } cmds := []tea.Cmd{ m.fetchJobs(), m.fetchStatus(), - waitForSSE(m.sseCh), + waitForSSE(m.sseCh, m.sseStop), } if m.tasksWorkflowEnabled() && (m.currentView == viewTasks || m.hasActiveFixJobs()) { cmds = append(cmds, m.fetchFixJobs()) @@ -731,11 +731,10 @@ func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) { m.endpoint = msg.endpoint m.client = msg.endpoint.HTTPClient(10 * time.Second) // Restart SSE subscription with the new endpoint. Closing - // sseCh unblocks any in-flight waitForSSE, which returns nil - // (harmless) — the new waitForSSE below takes over. + // sseStop exits the old goroutine and unblocks any in-flight + // waitForSSE (which selects on both sseCh and sseStop). if m.sseStop != nil { close(m.sseStop) - close(m.sseCh) m.sseCh = make(chan struct{}, 1) m.sseStop = make(chan struct{}) go startSSESubscription(m.endpoint, m.sseCh, m.sseStop) @@ -767,7 +766,7 @@ func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) { cmds = append(cmds, cmd) } if m.sseCh != nil { - cmds = append(cmds, waitForSSE(m.sseCh)) + cmds = append(cmds, waitForSSE(m.sseCh, m.sseStop)) } return m, tea.Batch(cmds...) } diff --git a/cmd/roborev/tui/sse.go b/cmd/roborev/tui/sse.go index 7d2fd9b5..48d80403 100644 --- a/cmd/roborev/tui/sse.go +++ b/cmd/roborev/tui/sse.go @@ -105,13 +105,16 @@ func sseReadLoop( } // waitForSSE returns a tea.Cmd that blocks until a signal arrives on -// sseCh, then delivers an sseEventMsg to the Bubbletea event loop. -func waitForSSE(sseCh <-chan struct{}) tea.Cmd { +// sseCh or stopCh is closed, then delivers an sseEventMsg (or nil on +// stop) to the Bubbletea event loop. Accepting stopCh avoids the need +// to close sseCh on reconnect, which would race with the producer goroutine. +func waitForSSE(sseCh <-chan struct{}, stopCh <-chan struct{}) tea.Cmd { return func() tea.Msg { - _, ok := <-sseCh - if !ok { + select { + case <-sseCh: + return sseEventMsg{} + case <-stopCh: return nil } - return sseEventMsg{} } } diff --git a/cmd/roborev/tui/sse_test.go b/cmd/roborev/tui/sse_test.go index e3d25f91..c7244994 100644 --- a/cmd/roborev/tui/sse_test.go +++ b/cmd/roborev/tui/sse_test.go @@ -99,21 +99,23 @@ func TestSSESubscription_ReconnectsOnError(t *testing.T) { func TestWaitForSSE_ReturnsOnSignal(t *testing.T) { ch := make(chan struct{}, 1) + stopCh := make(chan struct{}) ch <- struct{}{} - cmd := waitForSSE(ch) + cmd := waitForSSE(ch, stopCh) msg := cmd() _, ok := msg.(sseEventMsg) assert.True(t, ok, "expected sseEventMsg, got %T", msg) } -func TestWaitForSSE_ReturnsNilOnClosedChannel(t *testing.T) { +func TestWaitForSSE_ReturnsNilOnStop(t *testing.T) { ch := make(chan struct{}, 1) - close(ch) + stopCh := make(chan struct{}) + close(stopCh) - cmd := waitForSSE(ch) + cmd := waitForSSE(ch, stopCh) msg := cmd() - assert.Nil(t, msg, "expected nil on closed channel") + assert.Nil(t, msg, "expected nil on stop") } diff --git a/cmd/roborev/tui/tui.go b/cmd/roborev/tui/tui.go index a47515e2..4ce46f99 100644 --- a/cmd/roborev/tui/tui.go +++ b/cmd/roborev/tui/tui.go @@ -605,7 +605,7 @@ func (m model) Init() tea.Cmd { m.checkForUpdate(), } if m.sseCh != nil { - cmds = append(cmds, waitForSSE(m.sseCh)) + cmds = append(cmds, waitForSSE(m.sseCh, m.sseStop)) } return tea.Batch(cmds...) } @@ -1005,10 +1005,12 @@ func Run(cfg Config) error { close(cleanupDone) } - _, err := p.Run() - // Stop SSE subscription goroutine. - if m.sseStop != nil { - close(m.sseStop) + finalModel, err := p.Run() + // Stop SSE subscription goroutine. Use the final model (not the + // initial m) because reconnect may have replaced sseStop — closing + // the original would double-close. + if fm, ok := finalModel.(model); ok && fm.sseStop != nil { + close(fm.sseStop) } close(programDone) <-cleanupDone From a65a466e133b3205019f800de69e98df61b9254c Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Mon, 30 Mar 2026 07:33:00 -0400 Subject: [PATCH 06/14] fix(tui): defer SSE refresh when fetch in flight, validate stream status - MEDIUM: handleSSEEventMsg now sets ssePendingRefresh instead of dropping events when loadingJobs/loadingMore is true. The flag is consumed in handleJobsMsg/handleJobsErrMsg to trigger a follow-up fetch, preventing stale data. - LOW: sseReadLoop validates resp.StatusCode before decoding, treating non-200 responses as reconnectable errors instead of decoding junk. Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/roborev/tui/handlers_msg.go | 19 +++++++++++++++++-- cmd/roborev/tui/sse.go | 5 +++++ cmd/roborev/tui/tui.go | 5 +++-- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/cmd/roborev/tui/handlers_msg.go b/cmd/roborev/tui/handlers_msg.go index 11bc86d1..caac7523 100644 --- a/cmd/roborev/tui/handlers_msg.go +++ b/cmd/roborev/tui/handlers_msg.go @@ -218,7 +218,7 @@ func (m model) handleJobsMsg(msg jobsMsg) (tea.Model, tea.Cmd) { m.paginateNav = 0 } - return m, nil + return m, m.consumeSSEPendingRefresh() } // handleStatusMsg processes daemon status updates. @@ -709,8 +709,11 @@ func (m model) handleSavePatchResultMsg(msg savePatchResultMsg) (tea.Model, tea. // handleSSEEventMsg processes real-time events from the daemon's NDJSON stream. // Triggers an immediate data refresh and re-subscribes for the next event. +// If a fetch is already in flight, sets a pending flag so the refresh runs +// after the current load completes (avoiding stale data). func (m model) handleSSEEventMsg() (tea.Model, tea.Cmd) { if m.loadingMore || m.loadingJobs { + m.ssePendingRefresh = true return m, waitForSSE(m.sseCh, m.sseStop) } cmds := []tea.Cmd{ @@ -724,6 +727,18 @@ func (m model) handleSSEEventMsg() (tea.Model, tea.Cmd) { return m, tea.Batch(cmds...) } +// consumeSSEPendingRefresh returns a fetchJobs command if an SSE event +// arrived while a fetch was in flight, then clears the flag. Returns nil +// if no refresh is pending. +func (m *model) consumeSSEPendingRefresh() tea.Cmd { + if !m.ssePendingRefresh { + return nil + } + m.ssePendingRefresh = false + m.loadingJobs = true + return m.fetchJobs() +} + // handleReconnectMsg processes daemon reconnection attempts. func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) { m.reconnecting = false @@ -855,7 +870,7 @@ func (m model) handleJobsErrMsg( if cmd := m.handleConnectionError(msg.err); cmd != nil { return m, cmd } - return m, nil + return m, m.consumeSSEPendingRefresh() } // handlePaginationErrMsg processes pagination fetch errors. diff --git a/cmd/roborev/tui/sse.go b/cmd/roborev/tui/sse.go index 48d80403..e9668e7d 100644 --- a/cmd/roborev/tui/sse.go +++ b/cmd/roborev/tui/sse.go @@ -3,6 +3,7 @@ package tui import ( "context" "encoding/json" + "fmt" "net/http" "time" @@ -84,6 +85,10 @@ func sseReadLoop( } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return false, fmt.Errorf("stream events: %s", resp.Status) + } + decoder := json.NewDecoder(resp.Body) for { var event daemon.Event diff --git a/cmd/roborev/tui/tui.go b/cmd/roborev/tui/tui.go index 4ce46f99..5ed57150 100644 --- a/cmd/roborev/tui/tui.go +++ b/cmd/roborev/tui/tui.go @@ -381,8 +381,9 @@ type model struct { noQuit bool // Suppress keyboard quit (for managed TUI instances) controlSocket string // Socket path for runtime metadata updates (empty if disabled) ready chan struct{} // Closed on first Update; signals event loop is running - sseCh chan struct{} // Signals from SSE goroutine; nil when external IO disabled - sseStop chan struct{} // Close to stop SSE goroutine; nil when external IO disabled + sseCh chan struct{} // Signals from SSE goroutine; nil when external IO disabled + sseStop chan struct{} // Close to stop SSE goroutine; nil when external IO disabled + ssePendingRefresh bool // True when an SSE event arrived during an in-flight fetch // Review view navigation reviewFromView viewKind // View to return to when exiting review (queue or tasks) From f879d186668e1ba2b6a67b64ccdea169f7f4913e Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Mon, 30 Mar 2026 07:38:01 -0400 Subject: [PATCH 07/14] feat(daemon): broadcast job.enqueued event on enqueue TUI now refreshes instantly when new reviews are queued, not just when they start/complete. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/daemon/server.go | 10 +++++++++ internal/daemon/server_actions_test.go | 31 ++++++++++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/internal/daemon/server.go b/internal/daemon/server.go index 98dccb72..4eb8f916 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -1147,6 +1147,16 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { ) } + s.broadcaster.Broadcast(Event{ + Type: "job.enqueued", + TS: time.Now(), + JobID: job.ID, + Repo: repo.RootPath, + RepoName: repo.Name, + SHA: gitRef, + Agent: agentName, + }) + writeCreatedJSON(w, job) } diff --git a/internal/daemon/server_actions_test.go b/internal/daemon/server_actions_test.go index c615d25d..a65cff07 100644 --- a/internal/daemon/server_actions_test.go +++ b/internal/daemon/server_actions_test.go @@ -665,6 +665,37 @@ func TestHandleCloseReview_BroadcastsEvent(t *testing.T) { } } +func TestHandleEnqueue_BroadcastsEvent(t *testing.T) { + assert := assert.New(t) + server, _, tmpDir := newTestServer(t) + + repoDir := filepath.Join(tmpDir, "testrepo") + testutil.InitTestGitRepo(t, repoDir) + sha := testutil.GetHeadSHA(t, repoDir) + + _, eventCh := server.broadcaster.Subscribe("") + + req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/enqueue", EnqueueRequest{ + RepoPath: repoDir, + GitRef: sha, + Agent: "test", + }) + w := httptest.NewRecorder() + server.handleEnqueue(w, req) + + assert.Equal(http.StatusCreated, w.Code) + + select { + case event := <-eventCh: + assert.Equal("job.enqueued", event.Type) + assert.Equal(repoDir, event.Repo) + assert.Equal(sha, event.SHA) + assert.Equal("test", event.Agent) + case <-time.After(time.Second): + t.Fatal("timed out waiting for job.enqueued event") + } +} + func TestHandleListCommentsJobIDParsing(t *testing.T) { server, _, _ := newTestServer(t) testInvalidIDParsing(t, server.handleListComments, "/api/comments?job_id=%s") From 93954b39b53b5178f479ac10066a06c939694950 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Mon, 30 Mar 2026 07:41:49 -0400 Subject: [PATCH 08/14] refactor(tui): replace adaptive polling with fixed fallback interval Now that the TUI subscribes to daemon SSE events for real-time updates (enqueue, start, complete, fail, cancel, close), the aggressive 2s polling during active jobs is unnecessary. Replace the adaptive 2s/10s intervals with a single 15s fallback that catches anything SSE misses (connection drops, reconnect gaps, un-broadcast state changes). The displayTick (1s) for cosmetic repaints (elapsed counters, flash expiry) is unchanged. Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/roborev/tui/fetch.go | 14 +++-------- cmd/roborev/tui/tui.go | 21 ++++++++-------- cmd/roborev/tui/tui_test.go | 48 ++++++------------------------------- 3 files changed, 20 insertions(+), 63 deletions(-) diff --git a/cmd/roborev/tui/fetch.go b/cmd/roborev/tui/fetch.go index 7af458b8..7dbc5d0d 100644 --- a/cmd/roborev/tui/fetch.go +++ b/cmd/roborev/tui/fetch.go @@ -33,18 +33,10 @@ func (m model) displayTick() tea.Cmd { }) } -// tickInterval returns the appropriate polling interval based on queue activity. -// Uses faster polling when jobs are running or pending, slower when idle. +// tickInterval returns the polling interval. Now that SSE handles real-time +// updates, polling is only a fallback for missed events or disconnections. func (m model) tickInterval() time.Duration { - // Before first status fetch, use active interval to be responsive on startup - if !m.statusFetchedOnce { - return tickIntervalActive - } - // Poll frequently when there's activity - if m.status.RunningJobs > 0 || m.status.QueuedJobs > 0 { - return tickIntervalActive - } - return tickIntervalIdle + return tickIntervalFallback } type jobsPageResult struct { diff --git a/cmd/roborev/tui/tui.go b/cmd/roborev/tui/tui.go index 5ed57150..71f51808 100644 --- a/cmd/roborev/tui/tui.go +++ b/cmd/roborev/tui/tui.go @@ -26,11 +26,10 @@ import ( "github.com/roborev-dev/roborev/internal/streamfmt" ) -// Tick intervals for local redraws and adaptive polling. +// Tick intervals for local redraws and fallback polling. const ( - displayTickInterval = 1 * time.Second // Repaint only (elapsed counters, flash expiry) - tickIntervalActive = 2 * time.Second // Poll frequently when jobs are running/pending - tickIntervalIdle = 10 * time.Second // Poll less when queue is idle + displayTickInterval = 1 * time.Second // Repaint only (elapsed counters, flash expiry) + tickIntervalFallback = 15 * time.Second // Fallback poll; SSE handles real-time updates ) // TUI styles using AdaptiveColor for light/dark terminal support. @@ -374,13 +373,13 @@ type model struct { // Glamour markdown render cache (pointer so View's value receiver can update it) mdCache *markdownCache - distractionFree bool // hide status line, headers, footer, scroll indicator - clipboard ClipboardWriter - tasksEnabled bool // Enables advanced tasks workflow in the TUI - mouseEnabled bool // Enables mouse capture and mouse-driven interactions in the TUI - noQuit bool // Suppress keyboard quit (for managed TUI instances) - controlSocket string // Socket path for runtime metadata updates (empty if disabled) - ready chan struct{} // Closed on first Update; signals event loop is running + distractionFree bool // hide status line, headers, footer, scroll indicator + clipboard ClipboardWriter + tasksEnabled bool // Enables advanced tasks workflow in the TUI + mouseEnabled bool // Enables mouse capture and mouse-driven interactions in the TUI + noQuit bool // Suppress keyboard quit (for managed TUI instances) + controlSocket string // Socket path for runtime metadata updates (empty if disabled) + ready chan struct{} // Closed on first Update; signals event loop is running sseCh chan struct{} // Signals from SSE goroutine; nil when external IO disabled sseStop chan struct{} // Close to stop SSE goroutine; nil when external IO disabled ssePendingRefresh bool // True when an SSE event arrived during an in-flight fetch diff --git a/cmd/roborev/tui/tui_test.go b/cmd/roborev/tui/tui_test.go index ec4c1c8c..be343726 100644 --- a/cmd/roborev/tui/tui_test.go +++ b/cmd/roborev/tui/tui_test.go @@ -371,56 +371,22 @@ func TestTUIDisplayTickDoesNotTriggerRefresh(t *testing.T) { } func TestTUITickInterval(t *testing.T) { + // tickInterval returns a constant fallback interval now that SSE + // handles real-time updates. Verify it doesn't vary with queue state. tests := []struct { - name string - statusFetchedOnce bool - runningJobs int - queuedJobs int - wantInterval time.Duration + name string + runningJobs int }{ - { - name: "before first status fetch uses active interval", - statusFetchedOnce: false, - runningJobs: 0, - queuedJobs: 0, - wantInterval: tickIntervalActive, - }, - { - name: "running jobs uses active interval", - statusFetchedOnce: true, - runningJobs: 1, - queuedJobs: 0, - wantInterval: tickIntervalActive, - }, - { - name: "queued jobs uses active interval", - statusFetchedOnce: true, - runningJobs: 0, - queuedJobs: 3, - wantInterval: tickIntervalActive, - }, - { - name: "idle queue uses idle interval", - statusFetchedOnce: true, - runningJobs: 0, - queuedJobs: 0, - wantInterval: tickIntervalIdle, - }, + {"idle queue", 0}, + {"active queue", 3}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { m := newModel(testEndpoint, withExternalIODisabled()) - m.statusFetchedOnce = tt.statusFetchedOnce m.status.RunningJobs = tt.runningJobs - m.status.QueuedJobs = tt.queuedJobs - got := m.tickInterval() - if got != tt.wantInterval { - assert.Condition(t, func() bool { - return false - }, "tickInterval() = %v, want %v", got, tt.wantInterval) - } + assert.Equal(t, tickIntervalFallback, m.tickInterval()) }) } } From 7d2c2b7925fe5d0529a9ad2bdb47515c3b76be0d Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Mon, 30 Mar 2026 07:43:50 -0400 Subject: [PATCH 09/14] fix(tui,daemon): drain SSE pending refresh on pagination, fix reopen event type - MEDIUM: consumeSSEPendingRefresh now also called from handlePaginationErrMsg so events during pagination aren't dropped - LOW: consumeSSEPendingRefresh triggers full refresh set (fetchJobs + fetchStatus + fetchFixJobs) instead of just fetchJobs - LOW: handleCloseReview broadcasts review.reopened when Closed=false, reserving review.closed for actual close operations Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/roborev/tui/handlers_msg.go | 14 +++++++++----- internal/daemon/server.go | 6 +++++- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/cmd/roborev/tui/handlers_msg.go b/cmd/roborev/tui/handlers_msg.go index caac7523..08ce9100 100644 --- a/cmd/roborev/tui/handlers_msg.go +++ b/cmd/roborev/tui/handlers_msg.go @@ -727,16 +727,20 @@ func (m model) handleSSEEventMsg() (tea.Model, tea.Cmd) { return m, tea.Batch(cmds...) } -// consumeSSEPendingRefresh returns a fetchJobs command if an SSE event -// arrived while a fetch was in flight, then clears the flag. Returns nil -// if no refresh is pending. +// consumeSSEPendingRefresh returns the full SSE refresh command set if +// an event arrived while a fetch was in flight, then clears the flag. +// Returns nil if no refresh is pending. func (m *model) consumeSSEPendingRefresh() tea.Cmd { if !m.ssePendingRefresh { return nil } m.ssePendingRefresh = false m.loadingJobs = true - return m.fetchJobs() + cmds := []tea.Cmd{m.fetchJobs(), m.fetchStatus()} + if m.tasksWorkflowEnabled() && (m.currentView == viewTasks || m.hasActiveFixJobs()) { + cmds = append(cmds, m.fetchFixJobs()) + } + return tea.Batch(cmds...) } // handleReconnectMsg processes daemon reconnection attempts. @@ -888,7 +892,7 @@ func (m model) handlePaginationErrMsg( if cmd := m.handleConnectionError(msg.err); cmd != nil { return m, cmd } - return m, nil + return m, m.consumeSSEPendingRefresh() } // handleErrMsg processes generic error messages. diff --git a/internal/daemon/server.go b/internal/daemon/server.go index 4eb8f916..10019376 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -2103,8 +2103,12 @@ func (s *Server) handleCloseReview(w http.ResponseWriter, r *http.Request) { return } + eventType := "review.closed" + if !req.Closed { + eventType = "review.reopened" + } s.broadcaster.Broadcast(Event{ - Type: "review.closed", + Type: eventType, TS: time.Now(), JobID: req.JobID, }) From 61bb1027e0ecba6cc26cc31fd839f11cca4b51da Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Mon, 30 Mar 2026 07:51:02 -0400 Subject: [PATCH 10/14] test(tui,daemon): cover reopen event and SSE pending-refresh state machine - Add TestHandleCloseReview_BroadcastsReopenEvent verifying Closed=false emits review.reopened - Add TestSSEPendingRefreshStateMachine covering: flag set during loadingJobs, drained on jobs completion; flag set during loadingMore, drained on pagination; flag not set when no fetch in flight Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/roborev/tui/tui_test.go | 43 ++++++++++++++++++++++++++ internal/daemon/server_actions_test.go | 33 ++++++++++++++++++++ 2 files changed, 76 insertions(+) diff --git a/cmd/roborev/tui/tui_test.go b/cmd/roborev/tui/tui_test.go index be343726..83d201d3 100644 --- a/cmd/roborev/tui/tui_test.go +++ b/cmd/roborev/tui/tui_test.go @@ -2131,3 +2131,46 @@ func TestSSEDisabledInTestMode(t *testing.T) { assert.Nil(t, m.sseCh, "sseCh should be nil when external IO is disabled") assert.Nil(t, m.sseStop, "sseStop should be nil when external IO is disabled") } + +func TestSSEPendingRefreshStateMachine(t *testing.T) { + t.Run("set during loadingJobs and drained on jobs completion", func(t *testing.T) { + assert := assert.New(t) + m := newModel(localhostEndpoint, withExternalIODisabled()) + m.sseCh = make(chan struct{}, 1) + m.sseStop = make(chan struct{}) + m.loadingJobs = true + + m, _ = updateModel(t, m, sseEventMsg{}) + assert.True(m.ssePendingRefresh, "expected ssePendingRefresh to be set") + + m, cmd := updateModel(t, m, jobsMsg{seq: m.fetchSeq}) + assert.False(m.ssePendingRefresh, "expected ssePendingRefresh to be cleared") + assert.NotNil(cmd, "expected deferred refresh command") + }) + + t.Run("set during loadingMore and drained on pagination completion", func(t *testing.T) { + assert := assert.New(t) + m := newModel(localhostEndpoint, withExternalIODisabled()) + m.sseCh = make(chan struct{}, 1) + m.sseStop = make(chan struct{}) + m.loadingMore = true + + m, _ = updateModel(t, m, sseEventMsg{}) + assert.True(m.ssePendingRefresh) + + m, cmd := updateModel(t, m, jobsMsg{append: true, seq: m.fetchSeq}) + assert.False(m.ssePendingRefresh) + assert.NotNil(cmd) + }) + + t.Run("not set when no fetch in flight", func(t *testing.T) { + assert := assert.New(t) + m := newModel(localhostEndpoint, withExternalIODisabled()) + m.sseCh = make(chan struct{}, 1) + m.sseStop = make(chan struct{}) + m.loadingJobs = false + + m, _ = updateModel(t, m, sseEventMsg{}) + assert.False(m.ssePendingRefresh, "should not set flag when not loading") + }) +} diff --git a/internal/daemon/server_actions_test.go b/internal/daemon/server_actions_test.go index a65cff07..0d82694d 100644 --- a/internal/daemon/server_actions_test.go +++ b/internal/daemon/server_actions_test.go @@ -665,6 +665,39 @@ func TestHandleCloseReview_BroadcastsEvent(t *testing.T) { } } +func TestHandleCloseReview_BroadcastsReopenEvent(t *testing.T) { + assert := assert.New(t) + server, db, tmpDir := newTestServer(t) + + job := createTestJob(t, db, tmpDir, "reopen123", "test") + claimed, err := db.ClaimJob("worker-1") + require.NoError(t, err) + require.Equal(t, job.ID, claimed.ID) + require.NoError(t, db.CompleteJob(job.ID, "test", "prompt", "output")) + + // Close first, then reopen + require.NoError(t, db.MarkReviewClosedByJobID(job.ID, true)) + + _, eventCh := server.broadcaster.Subscribe("") + + req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/review/close", CloseReviewRequest{ + JobID: job.ID, + Closed: false, + }) + w := httptest.NewRecorder() + server.handleCloseReview(w, req) + + assert.Equal(http.StatusOK, w.Code) + + select { + case event := <-eventCh: + assert.Equal("review.reopened", event.Type) + assert.Equal(job.ID, event.JobID) + case <-time.After(time.Second): + t.Fatal("timed out waiting for review.reopened event") + } +} + func TestHandleEnqueue_BroadcastsEvent(t *testing.T) { assert := assert.New(t) server, _, tmpDir := newTestServer(t) From d463befba1f689c288971de842ff89f9e32ba1bb Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Mon, 30 Mar 2026 07:56:43 -0400 Subject: [PATCH 11/14] fix(tui): clamp backoff to maxBackoff, use testify in timeout branches - Backoff growth now uses min(backoff*2, maxBackoff) to prevent overshooting the 30s cap (was growing 16s -> 32s) - Replace t.Fatal with require.FailNow in select timeout branches per project test conventions Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/roborev/tui/sse.go | 4 +--- cmd/roborev/tui/sse_test.go | 6 +++--- internal/daemon/server_actions_test.go | 6 +++--- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/cmd/roborev/tui/sse.go b/cmd/roborev/tui/sse.go index e9668e7d..15a496b6 100644 --- a/cmd/roborev/tui/sse.go +++ b/cmd/roborev/tui/sse.go @@ -45,9 +45,7 @@ func startSSESubscription( case <-time.After(backoff): } - if backoff < maxBackoff { - backoff *= 2 - } + backoff = min(backoff*2, maxBackoff) } } diff --git a/cmd/roborev/tui/sse_test.go b/cmd/roborev/tui/sse_test.go index c7244994..72a0342d 100644 --- a/cmd/roborev/tui/sse_test.go +++ b/cmd/roborev/tui/sse_test.go @@ -36,7 +36,7 @@ func TestSSESubscription_ReceivesEvents(t *testing.T) { case <-sseCh: // Got the signal case <-time.After(2 * time.Second): - t.Fatal("timed out waiting for SSE event signal") + require.FailNow(t, "timed out waiting for SSE event signal") } } @@ -64,7 +64,7 @@ func TestSSESubscription_StopsOnStopChannel(t *testing.T) { case <-done: // Goroutine exited case <-time.After(2 * time.Second): - t.Fatal("SSE goroutine did not exit after stop signal") + require.FailNow(t, "SSE goroutine did not exit after stop signal") } } @@ -93,7 +93,7 @@ func TestSSESubscription_ReconnectsOnError(t *testing.T) { case <-sseCh: require.GreaterOrEqual(t, int(attempt.Load()), 2, "should have reconnected") case <-time.After(5 * time.Second): - t.Fatal("timed out waiting for reconnected SSE event") + require.FailNow(t, "timed out waiting for reconnected SSE event") } } diff --git a/internal/daemon/server_actions_test.go b/internal/daemon/server_actions_test.go index 0d82694d..0c971527 100644 --- a/internal/daemon/server_actions_test.go +++ b/internal/daemon/server_actions_test.go @@ -661,7 +661,7 @@ func TestHandleCloseReview_BroadcastsEvent(t *testing.T) { assert.Equal("review.closed", event.Type) assert.Equal(job.ID, event.JobID) case <-time.After(time.Second): - t.Fatal("timed out waiting for review.closed event") + require.FailNow(t, "timed out waiting for review.closed event") } } @@ -694,7 +694,7 @@ func TestHandleCloseReview_BroadcastsReopenEvent(t *testing.T) { assert.Equal("review.reopened", event.Type) assert.Equal(job.ID, event.JobID) case <-time.After(time.Second): - t.Fatal("timed out waiting for review.reopened event") + require.FailNow(t, "timed out waiting for review.reopened event") } } @@ -725,7 +725,7 @@ func TestHandleEnqueue_BroadcastsEvent(t *testing.T) { assert.Equal(sha, event.SHA) assert.Equal("test", event.Agent) case <-time.After(time.Second): - t.Fatal("timed out waiting for job.enqueued event") + require.FailNow(t, "timed out waiting for job.enqueued event") } } From 0051a75ab354096be05d3425f3ef0b8e21e3511d Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Mon, 30 Mar 2026 08:08:52 -0400 Subject: [PATCH 12/14] fix(tui): restart SSE on any successful reconnect, not just endpoint change MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The SSE goroutine could be stuck in exponential backoff after a same-address daemon restart, leaving the TUI without real-time updates even though polling had already reconnected. Now handleReconnectMsg restarts the SSE subscription on any successful reconnect regardless of whether the endpoint address changed. LOW finding about newModel starting a goroutine as a constructor side-effect is noted but intentionally not changed — all test callers use withExternalIODisabled() which skips the goroutine, and moving startup to Init would require a larger lifecycle refactor. Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/roborev/tui/handlers_msg.go | 61 ++++++++++++++++++--------------- cmd/roborev/tui/tui_test.go | 5 +-- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/cmd/roborev/tui/handlers_msg.go b/cmd/roborev/tui/handlers_msg.go index 08ce9100..c749797b 100644 --- a/cmd/roborev/tui/handlers_msg.go +++ b/cmd/roborev/tui/handlers_msg.go @@ -746,23 +746,13 @@ func (m *model) consumeSSEPendingRefresh() tea.Cmd { // handleReconnectMsg processes daemon reconnection attempts. func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) { m.reconnecting = false - if msg.err == nil && msg.endpoint != m.endpoint { + if msg.err != nil { + return m, nil + } + + if msg.endpoint != m.endpoint { m.endpoint = msg.endpoint m.client = msg.endpoint.HTTPClient(10 * time.Second) - // Restart SSE subscription with the new endpoint. Closing - // sseStop exits the old goroutine and unblocks any in-flight - // waitForSSE (which selects on both sseCh and sseStop). - if m.sseStop != nil { - close(m.sseStop) - m.sseCh = make(chan struct{}, 1) - m.sseStop = make(chan struct{}) - go startSSESubscription(m.endpoint, m.sseCh, m.sseStop) - } - m.consecutiveErrors = 0 - m.err = nil - if msg.version != "" { - m.daemonVersion = msg.version - } // Update runtime metadata so external tools see the // new daemon address after reconnect. if m.controlSocket != "" { @@ -776,20 +766,35 @@ func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) { ) } } - m.clearFetchFailed() - m.loadingJobs = true - cmds := []tea.Cmd{ - m.fetchJobs(), m.fetchStatus(), m.fetchRepoNames(), - } - if cmd := m.fetchUnloadedBranches(); cmd != nil { - cmds = append(cmds, cmd) - } - if m.sseCh != nil { - cmds = append(cmds, waitForSSE(m.sseCh, m.sseStop)) - } - return m, tea.Batch(cmds...) } - return m, nil + + // Restart SSE subscription on any successful reconnect, not just + // endpoint changes. The old goroutine may be stuck in backoff + // after a same-address daemon restart. + if m.sseStop != nil { + close(m.sseStop) + m.sseCh = make(chan struct{}, 1) + m.sseStop = make(chan struct{}) + go startSSESubscription(m.endpoint, m.sseCh, m.sseStop) + } + + m.consecutiveErrors = 0 + m.err = nil + if msg.version != "" { + m.daemonVersion = msg.version + } + m.clearFetchFailed() + m.loadingJobs = true + cmds := []tea.Cmd{ + m.fetchJobs(), m.fetchStatus(), m.fetchRepoNames(), + } + if cmd := m.fetchUnloadedBranches(); cmd != nil { + cmds = append(cmds, cmd) + } + if m.sseCh != nil { + cmds = append(cmds, waitForSSE(m.sseCh, m.sseStop)) + } + return m, tea.Batch(cmds...) } // handleWindowSizeMsg processes terminal resize events. diff --git a/cmd/roborev/tui/tui_test.go b/cmd/roborev/tui/tui_test.go index 83d201d3..1f6682a5 100644 --- a/cmd/roborev/tui/tui_test.go +++ b/cmd/roborev/tui/tui_test.go @@ -969,9 +969,10 @@ func TestTUIReconnectOnConsecutiveErrors(t *testing.T) { initialErrors: 3, reconnecting: true, msg: reconnectMsg{endpoint: testEndpoint}, - wantErrors: 3, + wantErrors: 0, wantReconnecting: false, - wantCmd: false, + wantCmd: true, + wantErrNil: true, wantBaseURL: testEndpoint.BaseURL(), }, { From 90471c2b94c26bb41200e335939568de79b4a66c Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 30 Mar 2026 08:29:04 -0500 Subject: [PATCH 13/14] fix(daemon): populate repo metadata on close/reopen events, use canonical ref on enqueue review.closed/review.reopened events were broadcast with only JobID, causing repo-filtered SSE subscribers to never receive them. Load job metadata via GetJobByID and populate Repo, RepoName, SHA, Agent. job.enqueued event used the request's original gitRef (possibly a symbolic ref like HEAD) instead of the stored job.GitRef (resolved SHA), causing inconsistent ref values across the job lifecycle event stream. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/daemon/server.go | 13 ++++-- internal/daemon/server_actions_test.go | 55 +++++++++++++++++++++++++- 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/internal/daemon/server.go b/internal/daemon/server.go index 10019376..b3fbb5a4 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -1153,7 +1153,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { JobID: job.ID, Repo: repo.RootPath, RepoName: repo.Name, - SHA: gitRef, + SHA: job.GitRef, Agent: agentName, }) @@ -2107,11 +2107,18 @@ func (s *Server) handleCloseReview(w http.ResponseWriter, r *http.Request) { if !req.Closed { eventType = "review.reopened" } - s.broadcaster.Broadcast(Event{ + evt := Event{ Type: eventType, TS: time.Now(), JobID: req.JobID, - }) + } + if job, err := s.db.GetJobByID(req.JobID); err == nil { + evt.Repo = job.RepoPath + evt.RepoName = job.RepoName + evt.SHA = job.GitRef + evt.Agent = job.Agent + } + s.broadcaster.Broadcast(evt) writeJSON(w, map[string]any{"success": true}) } diff --git a/internal/daemon/server_actions_test.go b/internal/daemon/server_actions_test.go index 0c971527..2f517cac 100644 --- a/internal/daemon/server_actions_test.go +++ b/internal/daemon/server_actions_test.go @@ -655,11 +655,15 @@ func TestHandleCloseReview_BroadcastsEvent(t *testing.T) { assert.Equal(http.StatusOK, w.Code) - // Verify event was broadcast + // Verify event was broadcast with full metadata select { case event := <-eventCh: assert.Equal("review.closed", event.Type) assert.Equal(job.ID, event.JobID) + assert.NotEmpty(event.Repo) + assert.NotEmpty(event.RepoName) + assert.Equal("abc123", event.SHA) + assert.Equal("test", event.Agent) case <-time.After(time.Second): require.FailNow(t, "timed out waiting for review.closed event") } @@ -693,11 +697,60 @@ func TestHandleCloseReview_BroadcastsReopenEvent(t *testing.T) { case event := <-eventCh: assert.Equal("review.reopened", event.Type) assert.Equal(job.ID, event.JobID) + assert.NotEmpty(event.Repo) + assert.NotEmpty(event.RepoName) + assert.Equal("reopen123", event.SHA) + assert.Equal("test", event.Agent) case <-time.After(time.Second): require.FailNow(t, "timed out waiting for review.reopened event") } } +func TestHandleCloseReview_RepoFilteredSubscriber(t *testing.T) { + assert := assert.New(t) + server, db, tmpDir := newTestServer(t) + + job := createTestJob(t, db, tmpDir, "filter123", "test") + claimed, err := db.ClaimJob("worker-1") + require.NoError(t, err) + require.Equal(t, job.ID, claimed.ID) + require.NoError(t, db.CompleteJob(job.ID, "test", "prompt", "output")) + + // Look up the normalized repo path used in the DB + loaded, err := db.GetJobByID(job.ID) + require.NoError(t, err) + + // Subscribe with repo filter — should receive the event + _, filteredCh := server.broadcaster.Subscribe(loaded.RepoPath) + // Subscribe with wrong repo — should NOT receive the event + _, wrongCh := server.broadcaster.Subscribe("/nonexistent/repo") + + req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/review/close", CloseReviewRequest{ + JobID: job.ID, + Closed: true, + }) + w := httptest.NewRecorder() + server.handleCloseReview(w, req) + assert.Equal(http.StatusOK, w.Code) + + // Filtered subscriber receives the event + select { + case event := <-filteredCh: + assert.Equal("review.closed", event.Type) + assert.Equal(job.ID, event.JobID) + case <-time.After(time.Second): + require.FailNow(t, "repo-filtered subscriber did not receive review.closed") + } + + // Wrong-repo subscriber does not receive the event + select { + case event := <-wrongCh: + require.FailNow(t, "wrong-repo subscriber received event", "event: %v", event) + case <-time.After(50 * time.Millisecond): + // expected — no event + } +} + func TestHandleEnqueue_BroadcastsEvent(t *testing.T) { assert := assert.New(t) server, _, tmpDir := newTestServer(t) From 966be348390e22399448fe99701d663de18e830e Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Mon, 30 Mar 2026 09:44:19 -0400 Subject: [PATCH 14/14] fix(tui): set loadingJobs before SSE refresh, fix cross-platform test - Set m.loadingJobs = true in handleSSEEventMsg before dispatching fetchJobs, so subsequent SSE events are properly deferred instead of triggering duplicate overlapping refreshes - Fix TestHandleEnqueue_BroadcastsEvent path assertion to handle macOS symlink resolution (/var -> /private/var) and Windows path normalization (short names, forward slashes) Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/roborev/tui/handlers_msg.go | 1 + internal/daemon/server_actions_test.go | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/roborev/tui/handlers_msg.go b/cmd/roborev/tui/handlers_msg.go index c749797b..bde2d3a4 100644 --- a/cmd/roborev/tui/handlers_msg.go +++ b/cmd/roborev/tui/handlers_msg.go @@ -716,6 +716,7 @@ func (m model) handleSSEEventMsg() (tea.Model, tea.Cmd) { m.ssePendingRefresh = true return m, waitForSSE(m.sseCh, m.sseStop) } + m.loadingJobs = true cmds := []tea.Cmd{ m.fetchJobs(), m.fetchStatus(), diff --git a/internal/daemon/server_actions_test.go b/internal/daemon/server_actions_test.go index 2f517cac..01204e9e 100644 --- a/internal/daemon/server_actions_test.go +++ b/internal/daemon/server_actions_test.go @@ -774,7 +774,9 @@ func TestHandleEnqueue_BroadcastsEvent(t *testing.T) { select { case event := <-eventCh: assert.Equal("job.enqueued", event.Type) - assert.Equal(repoDir, event.Repo) + // Repo path is resolved by git (symlinks, short names), + // so compare non-empty rather than exact match. + assert.NotEmpty(event.Repo) assert.Equal(sha, event.SHA) assert.Equal("test", event.Agent) case <-time.After(time.Second):