diff --git a/cmd/roborev/tui/fetch.go b/cmd/roborev/tui/fetch.go index 7af458b8..7dbc5d0d 100644 --- a/cmd/roborev/tui/fetch.go +++ b/cmd/roborev/tui/fetch.go @@ -33,18 +33,10 @@ func (m model) displayTick() tea.Cmd { }) } -// tickInterval returns the appropriate polling interval based on queue activity. -// Uses faster polling when jobs are running or pending, slower when idle. +// tickInterval returns the polling interval. Now that SSE handles real-time +// updates, polling is only a fallback for missed events or disconnections. func (m model) tickInterval() time.Duration { - // Before first status fetch, use active interval to be responsive on startup - if !m.statusFetchedOnce { - return tickIntervalActive - } - // Poll frequently when there's activity - if m.status.RunningJobs > 0 || m.status.QueuedJobs > 0 { - return tickIntervalActive - } - return tickIntervalIdle + return tickIntervalFallback } type jobsPageResult struct { diff --git a/cmd/roborev/tui/handlers_msg.go b/cmd/roborev/tui/handlers_msg.go index 63bbd436..bde2d3a4 100644 --- a/cmd/roborev/tui/handlers_msg.go +++ b/cmd/roborev/tui/handlers_msg.go @@ -218,7 +218,7 @@ func (m model) handleJobsMsg(msg jobsMsg) (tea.Model, tea.Cmd) { m.paginateNav = 0 } - return m, nil + return m, m.consumeSSEPendingRefresh() } // handleStatusMsg processes daemon status updates. @@ -707,17 +707,53 @@ func (m model) handleSavePatchResultMsg(msg savePatchResultMsg) (tea.Model, tea. return m, nil } +// handleSSEEventMsg processes real-time events from the daemon's NDJSON stream. +// Triggers an immediate data refresh and re-subscribes for the next event. +// If a fetch is already in flight, sets a pending flag so the refresh runs +// after the current load completes (avoiding stale data). +func (m model) handleSSEEventMsg() (tea.Model, tea.Cmd) { + if m.loadingMore || m.loadingJobs { + m.ssePendingRefresh = true + return m, waitForSSE(m.sseCh, m.sseStop) + } + m.loadingJobs = true + cmds := []tea.Cmd{ + m.fetchJobs(), + m.fetchStatus(), + waitForSSE(m.sseCh, m.sseStop), + } + if m.tasksWorkflowEnabled() && (m.currentView == viewTasks || m.hasActiveFixJobs()) { + cmds = append(cmds, m.fetchFixJobs()) + } + return m, tea.Batch(cmds...) +} + +// consumeSSEPendingRefresh returns the full SSE refresh command set if +// an event arrived while a fetch was in flight, then clears the flag. +// Returns nil if no refresh is pending. +func (m *model) consumeSSEPendingRefresh() tea.Cmd { + if !m.ssePendingRefresh { + return nil + } + m.ssePendingRefresh = false + m.loadingJobs = true + cmds := []tea.Cmd{m.fetchJobs(), m.fetchStatus()} + if m.tasksWorkflowEnabled() && (m.currentView == viewTasks || m.hasActiveFixJobs()) { + cmds = append(cmds, m.fetchFixJobs()) + } + return tea.Batch(cmds...) +} + // handleReconnectMsg processes daemon reconnection attempts. func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) { m.reconnecting = false - if msg.err == nil && msg.endpoint != m.endpoint { + if msg.err != nil { + return m, nil + } + + if msg.endpoint != m.endpoint { m.endpoint = msg.endpoint m.client = msg.endpoint.HTTPClient(10 * time.Second) - m.consecutiveErrors = 0 - m.err = nil - if msg.version != "" { - m.daemonVersion = msg.version - } // Update runtime metadata so external tools see the // new daemon address after reconnect. if m.controlSocket != "" { @@ -731,17 +767,35 @@ func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) { ) } } - m.clearFetchFailed() - m.loadingJobs = true - cmds := []tea.Cmd{ - m.fetchJobs(), m.fetchStatus(), m.fetchRepoNames(), - } - if cmd := m.fetchUnloadedBranches(); cmd != nil { - cmds = append(cmds, cmd) - } - return m, tea.Batch(cmds...) } - return m, nil + + // Restart SSE subscription on any successful reconnect, not just + // endpoint changes. The old goroutine may be stuck in backoff + // after a same-address daemon restart. + if m.sseStop != nil { + close(m.sseStop) + m.sseCh = make(chan struct{}, 1) + m.sseStop = make(chan struct{}) + go startSSESubscription(m.endpoint, m.sseCh, m.sseStop) + } + + m.consecutiveErrors = 0 + m.err = nil + if msg.version != "" { + m.daemonVersion = msg.version + } + m.clearFetchFailed() + m.loadingJobs = true + cmds := []tea.Cmd{ + m.fetchJobs(), m.fetchStatus(), m.fetchRepoNames(), + } + if cmd := m.fetchUnloadedBranches(); cmd != nil { + cmds = append(cmds, cmd) + } + if m.sseCh != nil { + cmds = append(cmds, waitForSSE(m.sseCh, m.sseStop)) + } + return m, tea.Batch(cmds...) } // handleWindowSizeMsg processes terminal resize events. @@ -826,7 +880,7 @@ func (m model) handleJobsErrMsg( if cmd := m.handleConnectionError(msg.err); cmd != nil { return m, cmd } - return m, nil + return m, m.consumeSSEPendingRefresh() } // handlePaginationErrMsg processes pagination fetch errors. @@ -844,7 +898,7 @@ func (m model) handlePaginationErrMsg( if cmd := m.handleConnectionError(msg.err); cmd != nil { return m, cmd } - return m, nil + return m, m.consumeSSEPendingRefresh() } // handleErrMsg processes generic error messages. diff --git a/cmd/roborev/tui/sse.go b/cmd/roborev/tui/sse.go new file mode 100644 index 00000000..15a496b6 --- /dev/null +++ b/cmd/roborev/tui/sse.go @@ -0,0 +1,123 @@ +package tui + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + tea "github.com/charmbracelet/bubbletea" + "github.com/roborev-dev/roborev/internal/daemon" +) + +// sseEventMsg signals that the daemon broadcast an event. +// The TUI uses this to trigger an immediate data refresh. +type sseEventMsg struct{} + +// startSSESubscription maintains a persistent NDJSON connection to the +// daemon's /api/stream/events endpoint. On each received event it sends +// a non-blocking signal to sseCh. The goroutine reconnects with +// exponential backoff on errors and exits when stopCh is closed. +func startSSESubscription( + endpoint daemon.DaemonEndpoint, + sseCh chan<- struct{}, + stopCh <-chan struct{}, +) { + const maxBackoff = 30 * time.Second + backoff := time.Second + + for { + connected, err := sseReadLoop(endpoint, sseCh, stopCh) + if err == nil { + return + } + + // Reset backoff after a connection that successfully read events, + // since the next failure is likely a fresh problem (daemon restart). + if connected { + backoff = time.Second + } + + select { + case <-stopCh: + return + case <-time.After(backoff): + } + + backoff = min(backoff*2, maxBackoff) + } +} + +// sseReadLoop connects to the event stream and reads NDJSON lines until +// the connection drops or stopCh fires. Returns (false, nil) when stopCh +// is closed, (connected, err) on connection/decode failure. connected is +// true if at least one event was successfully read. +func sseReadLoop( + endpoint daemon.DaemonEndpoint, + sseCh chan<- struct{}, + stopCh <-chan struct{}, +) (connected bool, err error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + select { + case <-stopCh: + cancel() + case <-ctx.Done(): + } + }() + + client := endpoint.HTTPClient(0) + req, err := http.NewRequestWithContext( + ctx, http.MethodGet, + endpoint.BaseURL()+"/api/stream/events", nil, + ) + if err != nil { + return false, err + } + resp, err := client.Do(req) + if err != nil { + return false, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return false, fmt.Errorf("stream events: %s", resp.Status) + } + + decoder := json.NewDecoder(resp.Body) + for { + var event daemon.Event + if err := decoder.Decode(&event); err != nil { + select { + case <-stopCh: + return connected, nil + default: + return connected, err + } + } + connected = true + + select { + case sseCh <- struct{}{}: + default: + } + } +} + +// waitForSSE returns a tea.Cmd that blocks until a signal arrives on +// sseCh or stopCh is closed, then delivers an sseEventMsg (or nil on +// stop) to the Bubbletea event loop. Accepting stopCh avoids the need +// to close sseCh on reconnect, which would race with the producer goroutine. +func waitForSSE(sseCh <-chan struct{}, stopCh <-chan struct{}) tea.Cmd { + return func() tea.Msg { + select { + case <-sseCh: + return sseEventMsg{} + case <-stopCh: + return nil + } + } +} diff --git a/cmd/roborev/tui/sse_test.go b/cmd/roborev/tui/sse_test.go new file mode 100644 index 00000000..72a0342d --- /dev/null +++ b/cmd/roborev/tui/sse_test.go @@ -0,0 +1,121 @@ +package tui + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "github.com/roborev-dev/roborev/internal/daemon" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSSESubscription_ReceivesEvents(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "/api/stream/events", r.URL.Path) + + w.Header().Set("Content-Type", "application/x-ndjson") + json.NewEncoder(w).Encode(daemon.Event{Type: "review.closed", JobID: 42}) + w.(http.Flusher).Flush() + + <-r.Context().Done() + })) + defer ts.Close() + + ep := testEndpointFromURL(ts.URL) + sseCh := make(chan struct{}, 1) + stopCh := make(chan struct{}) + defer close(stopCh) + + go startSSESubscription(ep, sseCh, stopCh) + + select { + case <-sseCh: + // Got the signal + case <-time.After(2 * time.Second): + require.FailNow(t, "timed out waiting for SSE event signal") + } +} + +func TestSSESubscription_StopsOnStopChannel(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/x-ndjson") + w.(http.Flusher).Flush() + <-r.Context().Done() + })) + defer ts.Close() + + ep := testEndpointFromURL(ts.URL) + sseCh := make(chan struct{}, 1) + stopCh := make(chan struct{}) + + done := make(chan struct{}) + go func() { + startSSESubscription(ep, sseCh, stopCh) + close(done) + }() + + close(stopCh) + + select { + case <-done: + // Goroutine exited + case <-time.After(2 * time.Second): + require.FailNow(t, "SSE goroutine did not exit after stop signal") + } +} + +func TestSSESubscription_ReconnectsOnError(t *testing.T) { + var attempt atomic.Int32 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := attempt.Add(1) + if n == 1 { + return + } + w.Header().Set("Content-Type", "application/x-ndjson") + json.NewEncoder(w).Encode(daemon.Event{Type: "review.closed", JobID: 1}) + w.(http.Flusher).Flush() + <-r.Context().Done() + })) + defer ts.Close() + + ep := testEndpointFromURL(ts.URL) + sseCh := make(chan struct{}, 1) + stopCh := make(chan struct{}) + defer close(stopCh) + + go startSSESubscription(ep, sseCh, stopCh) + + select { + case <-sseCh: + require.GreaterOrEqual(t, int(attempt.Load()), 2, "should have reconnected") + case <-time.After(5 * time.Second): + require.FailNow(t, "timed out waiting for reconnected SSE event") + } +} + +func TestWaitForSSE_ReturnsOnSignal(t *testing.T) { + ch := make(chan struct{}, 1) + stopCh := make(chan struct{}) + ch <- struct{}{} + + cmd := waitForSSE(ch, stopCh) + msg := cmd() + + _, ok := msg.(sseEventMsg) + assert.True(t, ok, "expected sseEventMsg, got %T", msg) +} + +func TestWaitForSSE_ReturnsNilOnStop(t *testing.T) { + ch := make(chan struct{}, 1) + stopCh := make(chan struct{}) + close(stopCh) + + cmd := waitForSSE(ch, stopCh) + msg := cmd() + + assert.Nil(t, msg, "expected nil on stop") +} diff --git a/cmd/roborev/tui/tui.go b/cmd/roborev/tui/tui.go index e4e10e19..71f51808 100644 --- a/cmd/roborev/tui/tui.go +++ b/cmd/roborev/tui/tui.go @@ -26,11 +26,10 @@ import ( "github.com/roborev-dev/roborev/internal/streamfmt" ) -// Tick intervals for local redraws and adaptive polling. +// Tick intervals for local redraws and fallback polling. const ( - displayTickInterval = 1 * time.Second // Repaint only (elapsed counters, flash expiry) - tickIntervalActive = 2 * time.Second // Poll frequently when jobs are running/pending - tickIntervalIdle = 10 * time.Second // Poll less when queue is idle + displayTickInterval = 1 * time.Second // Repaint only (elapsed counters, flash expiry) + tickIntervalFallback = 15 * time.Second // Fallback poll; SSE handles real-time updates ) // TUI styles using AdaptiveColor for light/dark terminal support. @@ -374,13 +373,16 @@ type model struct { // Glamour markdown render cache (pointer so View's value receiver can update it) mdCache *markdownCache - distractionFree bool // hide status line, headers, footer, scroll indicator - clipboard ClipboardWriter - tasksEnabled bool // Enables advanced tasks workflow in the TUI - mouseEnabled bool // Enables mouse capture and mouse-driven interactions in the TUI - noQuit bool // Suppress keyboard quit (for managed TUI instances) - controlSocket string // Socket path for runtime metadata updates (empty if disabled) - ready chan struct{} // Closed on first Update; signals event loop is running + distractionFree bool // hide status line, headers, footer, scroll indicator + clipboard ClipboardWriter + tasksEnabled bool // Enables advanced tasks workflow in the TUI + mouseEnabled bool // Enables mouse capture and mouse-driven interactions in the TUI + noQuit bool // Suppress keyboard quit (for managed TUI instances) + controlSocket string // Socket path for runtime metadata updates (empty if disabled) + ready chan struct{} // Closed on first Update; signals event loop is running + sseCh chan struct{} // Signals from SSE goroutine; nil when external IO disabled + sseStop chan struct{} // Close to stop SSE goroutine; nil when external IO disabled + ssePendingRefresh bool // True when an SSE event arrived during an in-flight fetch // Review view navigation reviewFromView viewKind // View to return to when exiting review (queue or tasks) @@ -494,6 +496,14 @@ func newModel(ep daemon.DaemonEndpoint, opts ...option) model { } } + var sseCh chan struct{} + var sseStop chan struct{} + if !opt.disableExternalIO { + sseCh = make(chan struct{}, 1) + sseStop = make(chan struct{}) + go startSSESubscription(ep, sseCh, sseStop) + } + // Test overrides for auto-filter simulation if opt.autoFilterRepo { autoFilterRepo = true @@ -573,6 +583,8 @@ func newModel(ep daemon.DaemonEndpoint, opts ...option) model { mouseEnabled: mouseEnabled, noQuit: opt.noQuit, ready: make(chan struct{}), + sseCh: sseCh, + sseStop: sseStop, colBordersOn: columnBorders, hiddenColumns: hiddenCols, columnOrder: colOrder, @@ -583,15 +595,19 @@ func newModel(ep daemon.DaemonEndpoint, opts ...option) model { } func (m model) Init() tea.Cmd { - return tea.Batch( - tea.WindowSize(), // request initial window size + cmds := []tea.Cmd{ + tea.WindowSize(), m.displayTick(), m.tick(), m.fetchJobs(), m.fetchStatus(), m.fetchRepoNames(), m.checkForUpdate(), - ) + } + if m.sseCh != nil { + cmds = append(cmds, waitForSSE(m.sseCh, m.sseStop)) + } + return tea.Batch(cmds...) } func (m model) tasksWorkflowEnabled() bool { @@ -780,6 +796,8 @@ func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { result, cmd = m.handleErrMsg(msg) case reconnectMsg: result, cmd = m.handleReconnectMsg(msg) + case sseEventMsg: + result, cmd = m.handleSSEEventMsg() case fixJobsMsg: result, cmd = m.handleFixJobsMsg(msg) case fixTriggerResultMsg: @@ -987,7 +1005,13 @@ func Run(cfg Config) error { close(cleanupDone) } - _, err := p.Run() + finalModel, err := p.Run() + // Stop SSE subscription goroutine. Use the final model (not the + // initial m) because reconnect may have replaced sseStop — closing + // the original would double-close. + if fm, ok := finalModel.(model); ok && fm.sseStop != nil { + close(fm.sseStop) + } close(programDone) <-cleanupDone return err diff --git a/cmd/roborev/tui/tui_test.go b/cmd/roborev/tui/tui_test.go index a1f0a9a4..1f6682a5 100644 --- a/cmd/roborev/tui/tui_test.go +++ b/cmd/roborev/tui/tui_test.go @@ -371,56 +371,22 @@ func TestTUIDisplayTickDoesNotTriggerRefresh(t *testing.T) { } func TestTUITickInterval(t *testing.T) { + // tickInterval returns a constant fallback interval now that SSE + // handles real-time updates. Verify it doesn't vary with queue state. tests := []struct { - name string - statusFetchedOnce bool - runningJobs int - queuedJobs int - wantInterval time.Duration + name string + runningJobs int }{ - { - name: "before first status fetch uses active interval", - statusFetchedOnce: false, - runningJobs: 0, - queuedJobs: 0, - wantInterval: tickIntervalActive, - }, - { - name: "running jobs uses active interval", - statusFetchedOnce: true, - runningJobs: 1, - queuedJobs: 0, - wantInterval: tickIntervalActive, - }, - { - name: "queued jobs uses active interval", - statusFetchedOnce: true, - runningJobs: 0, - queuedJobs: 3, - wantInterval: tickIntervalActive, - }, - { - name: "idle queue uses idle interval", - statusFetchedOnce: true, - runningJobs: 0, - queuedJobs: 0, - wantInterval: tickIntervalIdle, - }, + {"idle queue", 0}, + {"active queue", 3}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { m := newModel(testEndpoint, withExternalIODisabled()) - m.statusFetchedOnce = tt.statusFetchedOnce m.status.RunningJobs = tt.runningJobs - m.status.QueuedJobs = tt.queuedJobs - got := m.tickInterval() - if got != tt.wantInterval { - assert.Condition(t, func() bool { - return false - }, "tickInterval() = %v, want %v", got, tt.wantInterval) - } + assert.Equal(t, tickIntervalFallback, m.tickInterval()) }) } } @@ -1003,9 +969,10 @@ func TestTUIReconnectOnConsecutiveErrors(t *testing.T) { initialErrors: 3, reconnecting: true, msg: reconnectMsg{endpoint: testEndpoint}, - wantErrors: 3, + wantErrors: 0, wantReconnecting: false, - wantCmd: false, + wantCmd: true, + wantErrNil: true, wantBaseURL: testEndpoint.BaseURL(), }, { @@ -2138,3 +2105,73 @@ func TestNewTuiModelOptions(t *testing.T) { }) } } + +func TestSSEEventTriggersRefresh(t *testing.T) { + m := newModel(localhostEndpoint, withExternalIODisabled()) + m.sseCh = make(chan struct{}, 1) + m.loadingJobs = false + + m, cmd := updateModel(t, m, sseEventMsg{}) + + assert.NotNil(t, cmd, "expected commands from SSE event") +} + +func TestSSEEventSkipsRefreshWhileLoading(t *testing.T) { + m := newModel(localhostEndpoint, withExternalIODisabled()) + m.sseCh = make(chan struct{}, 1) + m.loadingJobs = true + + m, cmd := updateModel(t, m, sseEventMsg{}) + + assert.NotNil(t, cmd, "expected re-subscribe command even while loading") +} + +func TestSSEDisabledInTestMode(t *testing.T) { + m := newModel(localhostEndpoint, withExternalIODisabled()) + + assert.Nil(t, m.sseCh, "sseCh should be nil when external IO is disabled") + assert.Nil(t, m.sseStop, "sseStop should be nil when external IO is disabled") +} + +func TestSSEPendingRefreshStateMachine(t *testing.T) { + t.Run("set during loadingJobs and drained on jobs completion", func(t *testing.T) { + assert := assert.New(t) + m := newModel(localhostEndpoint, withExternalIODisabled()) + m.sseCh = make(chan struct{}, 1) + m.sseStop = make(chan struct{}) + m.loadingJobs = true + + m, _ = updateModel(t, m, sseEventMsg{}) + assert.True(m.ssePendingRefresh, "expected ssePendingRefresh to be set") + + m, cmd := updateModel(t, m, jobsMsg{seq: m.fetchSeq}) + assert.False(m.ssePendingRefresh, "expected ssePendingRefresh to be cleared") + assert.NotNil(cmd, "expected deferred refresh command") + }) + + t.Run("set during loadingMore and drained on pagination completion", func(t *testing.T) { + assert := assert.New(t) + m := newModel(localhostEndpoint, withExternalIODisabled()) + m.sseCh = make(chan struct{}, 1) + m.sseStop = make(chan struct{}) + m.loadingMore = true + + m, _ = updateModel(t, m, sseEventMsg{}) + assert.True(m.ssePendingRefresh) + + m, cmd := updateModel(t, m, jobsMsg{append: true, seq: m.fetchSeq}) + assert.False(m.ssePendingRefresh) + assert.NotNil(cmd) + }) + + t.Run("not set when no fetch in flight", func(t *testing.T) { + assert := assert.New(t) + m := newModel(localhostEndpoint, withExternalIODisabled()) + m.sseCh = make(chan struct{}, 1) + m.sseStop = make(chan struct{}) + m.loadingJobs = false + + m, _ = updateModel(t, m, sseEventMsg{}) + assert.False(m.ssePendingRefresh, "should not set flag when not loading") + }) +} diff --git a/internal/daemon/server.go b/internal/daemon/server.go index f1d71ca8..b3fbb5a4 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -1147,6 +1147,16 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { ) } + s.broadcaster.Broadcast(Event{ + Type: "job.enqueued", + TS: time.Now(), + JobID: job.ID, + Repo: repo.RootPath, + RepoName: repo.Name, + SHA: job.GitRef, + Agent: agentName, + }) + writeCreatedJSON(w, job) } @@ -2093,6 +2103,23 @@ func (s *Server) handleCloseReview(w http.ResponseWriter, r *http.Request) { return } + eventType := "review.closed" + if !req.Closed { + eventType = "review.reopened" + } + evt := Event{ + Type: eventType, + TS: time.Now(), + JobID: req.JobID, + } + if job, err := s.db.GetJobByID(req.JobID); err == nil { + evt.Repo = job.RepoPath + evt.RepoName = job.RepoName + evt.SHA = job.GitRef + evt.Agent = job.Agent + } + s.broadcaster.Broadcast(evt) + writeJSON(w, map[string]any{"success": true}) } diff --git a/internal/daemon/server_actions_test.go b/internal/daemon/server_actions_test.go index c48e6adb..01204e9e 100644 --- a/internal/daemon/server_actions_test.go +++ b/internal/daemon/server_actions_test.go @@ -632,6 +632,158 @@ func TestHandleAddCommentWithoutReview(t *testing.T) { } } +func TestHandleCloseReview_BroadcastsEvent(t *testing.T) { + assert := assert.New(t) + server, db, tmpDir := newTestServer(t) + + // Create a completed job (which creates a review) + job := createTestJob(t, db, tmpDir, "abc123", "test") + claimed, err := db.ClaimJob("worker-1") + require.NoError(t, err) + require.Equal(t, job.ID, claimed.ID) + require.NoError(t, db.CompleteJob(job.ID, "test", "prompt", "output")) + + // Subscribe to broadcaster before the close call + _, eventCh := server.broadcaster.Subscribe("") + + req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/review/close", CloseReviewRequest{ + JobID: job.ID, + Closed: true, + }) + w := httptest.NewRecorder() + server.handleCloseReview(w, req) + + assert.Equal(http.StatusOK, w.Code) + + // Verify event was broadcast with full metadata + select { + case event := <-eventCh: + assert.Equal("review.closed", event.Type) + assert.Equal(job.ID, event.JobID) + assert.NotEmpty(event.Repo) + assert.NotEmpty(event.RepoName) + assert.Equal("abc123", event.SHA) + assert.Equal("test", event.Agent) + case <-time.After(time.Second): + require.FailNow(t, "timed out waiting for review.closed event") + } +} + +func TestHandleCloseReview_BroadcastsReopenEvent(t *testing.T) { + assert := assert.New(t) + server, db, tmpDir := newTestServer(t) + + job := createTestJob(t, db, tmpDir, "reopen123", "test") + claimed, err := db.ClaimJob("worker-1") + require.NoError(t, err) + require.Equal(t, job.ID, claimed.ID) + require.NoError(t, db.CompleteJob(job.ID, "test", "prompt", "output")) + + // Close first, then reopen + require.NoError(t, db.MarkReviewClosedByJobID(job.ID, true)) + + _, eventCh := server.broadcaster.Subscribe("") + + req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/review/close", CloseReviewRequest{ + JobID: job.ID, + Closed: false, + }) + w := httptest.NewRecorder() + server.handleCloseReview(w, req) + + assert.Equal(http.StatusOK, w.Code) + + select { + case event := <-eventCh: + assert.Equal("review.reopened", event.Type) + assert.Equal(job.ID, event.JobID) + assert.NotEmpty(event.Repo) + assert.NotEmpty(event.RepoName) + assert.Equal("reopen123", event.SHA) + assert.Equal("test", event.Agent) + case <-time.After(time.Second): + require.FailNow(t, "timed out waiting for review.reopened event") + } +} + +func TestHandleCloseReview_RepoFilteredSubscriber(t *testing.T) { + assert := assert.New(t) + server, db, tmpDir := newTestServer(t) + + job := createTestJob(t, db, tmpDir, "filter123", "test") + claimed, err := db.ClaimJob("worker-1") + require.NoError(t, err) + require.Equal(t, job.ID, claimed.ID) + require.NoError(t, db.CompleteJob(job.ID, "test", "prompt", "output")) + + // Look up the normalized repo path used in the DB + loaded, err := db.GetJobByID(job.ID) + require.NoError(t, err) + + // Subscribe with repo filter — should receive the event + _, filteredCh := server.broadcaster.Subscribe(loaded.RepoPath) + // Subscribe with wrong repo — should NOT receive the event + _, wrongCh := server.broadcaster.Subscribe("/nonexistent/repo") + + req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/review/close", CloseReviewRequest{ + JobID: job.ID, + Closed: true, + }) + w := httptest.NewRecorder() + server.handleCloseReview(w, req) + assert.Equal(http.StatusOK, w.Code) + + // Filtered subscriber receives the event + select { + case event := <-filteredCh: + assert.Equal("review.closed", event.Type) + assert.Equal(job.ID, event.JobID) + case <-time.After(time.Second): + require.FailNow(t, "repo-filtered subscriber did not receive review.closed") + } + + // Wrong-repo subscriber does not receive the event + select { + case event := <-wrongCh: + require.FailNow(t, "wrong-repo subscriber received event", "event: %v", event) + case <-time.After(50 * time.Millisecond): + // expected — no event + } +} + +func TestHandleEnqueue_BroadcastsEvent(t *testing.T) { + assert := assert.New(t) + server, _, tmpDir := newTestServer(t) + + repoDir := filepath.Join(tmpDir, "testrepo") + testutil.InitTestGitRepo(t, repoDir) + sha := testutil.GetHeadSHA(t, repoDir) + + _, eventCh := server.broadcaster.Subscribe("") + + req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/enqueue", EnqueueRequest{ + RepoPath: repoDir, + GitRef: sha, + Agent: "test", + }) + w := httptest.NewRecorder() + server.handleEnqueue(w, req) + + assert.Equal(http.StatusCreated, w.Code) + + select { + case event := <-eventCh: + assert.Equal("job.enqueued", event.Type) + // Repo path is resolved by git (symlinks, short names), + // so compare non-empty rather than exact match. + assert.NotEmpty(event.Repo) + assert.Equal(sha, event.SHA) + assert.Equal("test", event.Agent) + case <-time.After(time.Second): + require.FailNow(t, "timed out waiting for job.enqueued event") + } +} + func TestHandleListCommentsJobIDParsing(t *testing.T) { server, _, _ := newTestServer(t) testInvalidIDParsing(t, server.handleListComments, "/api/comments?job_id=%s")