diff --git a/cmd/roborev/tui/control_handlers.go b/cmd/roborev/tui/control_handlers.go index 41e3c75d..d50ea104 100644 --- a/cmd/roborev/tui/control_handlers.go +++ b/cmd/roborev/tui/control_handlers.go @@ -394,7 +394,7 @@ func (m model) handleCtrlSetView( m.currentView = v var cmd tea.Cmd if v == viewTasks { - cmd = m.fetchFixJobs() + cmd = m.startFetchFixJobs() } return m, controlResponse{OK: true}, cmd } diff --git a/cmd/roborev/tui/fetch.go b/cmd/roborev/tui/fetch.go index 7dbc5d0d..c24721b0 100644 --- a/cmd/roborev/tui/fetch.go +++ b/cmd/roborev/tui/fetch.go @@ -208,13 +208,37 @@ func (m model) fetchMoreJobs() tea.Cmd { } func (m model) fetchStatus() tea.Cmd { + gen := m.fetchGen return func() tea.Msg { var status storage.DaemonStatus if err := m.getJSON("/api/status", &status); err != nil { - return errMsg(err) + return statusErrMsg{err: err, gen: gen} } - return statusMsg(status) + return statusMsg{status: status, gen: gen} + } +} + +// startFetchStatus dispatches fetchStatus if no status fetch is already +// in flight, and sets the loadingStatus flag. Returns nil if skipped. +func (m *model) startFetchStatus() tea.Cmd { + if m.loadingStatus { + return nil } + m.loadingStatus = true + return m.fetchStatus() +} + +// requestFetchStatus is like startFetchStatus but for paths triggered by +// daemon state changes (SSE events). If a fetch is already in flight, it +// marks the current data as stale so handleStatusMsg will dispatch a +// follow-up fetch when the in-flight one returns. +func (m *model) requestFetchStatus() tea.Cmd { + if m.loadingStatus { + m.statusStale = true + return nil + } + m.loadingStatus = true + return m.fetchStatus() } func (m model) checkForUpdate() tea.Cmd { @@ -808,6 +832,7 @@ func (m model) fetchPatch(jobID int64) tea.Cmd { // fetchFixJobs fetches fix jobs from the daemon. func (m model) fetchFixJobs() tea.Cmd { + gen := m.fetchGen return func() tea.Msg { params := neturl.Values{} params.Set("job_type", "fix") @@ -815,8 +840,31 @@ func (m model) fetchFixJobs() tea.Cmd { result, err := m.loadJobsPage(params) if err != nil { - return fixJobsMsg{err: err} + return fixJobsMsg{err: err, gen: gen} } - return fixJobsMsg{jobs: result.Jobs} + return fixJobsMsg{jobs: result.Jobs, gen: gen} } } + +// startFetchFixJobs dispatches fetchFixJobs if no fix-jobs fetch is already +// in flight, and sets the loadingFixJobs flag. Returns nil if skipped. +func (m *model) startFetchFixJobs() tea.Cmd { + if m.loadingFixJobs { + return nil + } + m.loadingFixJobs = true + return m.fetchFixJobs() +} + +// requestFetchFixJobs is like startFetchFixJobs but for handlers that follow +// state-mutating operations (fix enqueue, patch apply). If a fetch is already +// in flight, it marks the current data as stale so handleFixJobsMsg will +// dispatch a follow-up fetch when the in-flight one returns. +func (m *model) requestFetchFixJobs() tea.Cmd { + if m.loadingFixJobs { + m.fixJobsStale = true + return nil + } + m.loadingFixJobs = true + return m.fetchFixJobs() +} diff --git a/cmd/roborev/tui/handlers_msg.go b/cmd/roborev/tui/handlers_msg.go index ffeb433f..a872cddd 100644 --- a/cmd/roborev/tui/handlers_msg.go +++ b/cmd/roborev/tui/handlers_msg.go @@ -228,7 +228,11 @@ func (m model) handleJobsMsg(msg jobsMsg) (tea.Model, tea.Cmd) { // handleStatusMsg processes daemon status updates. func (m model) handleStatusMsg(msg statusMsg) (tea.Model, tea.Cmd) { - m.status = storage.DaemonStatus(msg) + if msg.gen < m.fetchGen { + return m, nil // discard pre-reconnect response + } + m.loadingStatus = false + m.status = msg.status m.consecutiveErrors = 0 if m.status.Version != "" { m.daemonVersion = m.status.Version @@ -239,6 +243,11 @@ func (m model) handleStatusMsg(msg statusMsg) (tea.Model, tea.Cmd) { } m.lastConfigReloadCounter = m.status.ConfigReloadCounter m.statusFetchedOnce = true + if m.statusStale { + m.statusStale = false + m.loadingStatus = true + return m, m.fetchStatus() + } return m, nil } @@ -435,6 +444,10 @@ func (m model) handleBranchesMsg( func (m model) handleFixJobsMsg( msg fixJobsMsg, ) (tea.Model, tea.Cmd) { + if msg.gen < m.fetchGen { + return m, nil // discard pre-reconnect response + } + m.loadingFixJobs = false if msg.err != nil { m.err = msg.err } else { @@ -445,6 +458,14 @@ func (m model) handleFixJobsMsg( m.fixSelectedIdx = len(m.fixJobs) - 1 } } + // A state-mutating handler requested a refresh while this fetch was + // in flight. The data we just received predates that mutation, so + // dispatch a follow-up fetch to pick up the latest state. + if m.fixJobsStale { + m.fixJobsStale = false + m.loadingFixJobs = true + return m, m.fetchFixJobs() + } return m, nil } @@ -724,11 +745,17 @@ func (m model) handleSSEEventMsg() (tea.Model, tea.Cmd) { m.loadingJobs = true cmds := []tea.Cmd{ m.fetchJobs(), - m.fetchStatus(), waitForSSE(m.sseCh, m.sseStop), } + // SSE events signal daemon state changes — use the stale-aware + // helpers so a skipped fetch gets retried after the in-flight one. + if cmd := m.requestFetchStatus(); cmd != nil { + cmds = append(cmds, cmd) + } if m.tasksWorkflowEnabled() && (m.currentView == viewTasks || m.hasActiveFixJobs()) { - cmds = append(cmds, m.fetchFixJobs()) + if cmd := m.requestFetchFixJobs(); cmd != nil { + cmds = append(cmds, cmd) + } } return m, tea.Batch(cmds...) } @@ -742,9 +769,14 @@ func (m *model) consumeSSEPendingRefresh() tea.Cmd { } m.ssePendingRefresh = false m.loadingJobs = true - cmds := []tea.Cmd{m.fetchJobs(), m.fetchStatus()} + cmds := []tea.Cmd{m.fetchJobs()} + if cmd := m.requestFetchStatus(); cmd != nil { + cmds = append(cmds, cmd) + } if m.tasksWorkflowEnabled() && (m.currentView == viewTasks || m.hasActiveFixJobs()) { - cmds = append(cmds, m.fetchFixJobs()) + if cmd := m.requestFetchFixJobs(); cmd != nil { + cmds = append(cmds, cmd) + } } return tea.Batch(cmds...) } @@ -790,9 +822,21 @@ func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) { m.daemonVersion = msg.version } m.clearFetchFailed() + m.fetchGen++ // invalidate pre-reconnect status/fix-jobs responses + m.fetchSeq++ // invalidate pre-reconnect jobs responses m.loadingJobs = true - cmds := []tea.Cmd{ - m.fetchJobs(), m.fetchStatus(), m.fetchRepoNames(), + m.loadingMore = false + cmds := []tea.Cmd{m.fetchJobs(), m.fetchRepoNames()} + // Force fetches on reconnect — previous in-flight requests + // were against the old connection and will fail or be stale. + m.loadingStatus = true + m.statusStale = false + cmds = append(cmds, m.fetchStatus()) + m.loadingFixJobs = false + m.fixJobsStale = false + if m.tasksWorkflowEnabled() { + m.loadingFixJobs = true + cmds = append(cmds, m.fetchFixJobs()) } if cmd := m.fetchUnloadedBranches(); cmd != nil { cmds = append(cmds, cmd) @@ -843,11 +887,20 @@ func (m model) handleTickMsg( ) (tea.Model, tea.Cmd) { // Skip job refresh while pagination or another refresh is in flight if m.loadingMore || m.loadingJobs { - return m, tea.Batch(m.tick(), m.fetchStatus()) + cmds := []tea.Cmd{m.tick()} + if cmd := m.startFetchStatus(); cmd != nil { + cmds = append(cmds, cmd) + } + return m, tea.Batch(cmds...) + } + cmds := []tea.Cmd{m.tick(), m.fetchJobs()} + if cmd := m.startFetchStatus(); cmd != nil { + cmds = append(cmds, cmd) } - cmds := []tea.Cmd{m.tick(), m.fetchJobs(), m.fetchStatus()} if m.tasksWorkflowEnabled() && (m.currentView == viewTasks || m.hasActiveFixJobs()) { - cmds = append(cmds, m.fetchFixJobs()) + if cmd := m.startFetchFixJobs(); cmd != nil { + cmds = append(cmds, cmd) + } } return m, tea.Batch(cmds...) } @@ -907,6 +960,25 @@ func (m model) handlePaginationErrMsg( } // handleErrMsg processes generic error messages. +func (m model) handleStatusErrMsg( + msg statusErrMsg, +) (tea.Model, tea.Cmd) { + if msg.gen < m.fetchGen { + return m, nil // discard pre-reconnect error + } + m.loadingStatus = false + m.err = msg.err + if m.statusStale { + m.statusStale = false + m.loadingStatus = true + return m, m.fetchStatus() + } + if cmd := m.handleConnectionError(msg.err); cmd != nil { + return m, cmd + } + return m, nil +} + func (m model) handleErrMsg( msg errMsg, ) (tea.Model, tea.Cmd) { @@ -928,12 +1000,12 @@ func (m model) handleFixTriggerResultMsg( ), 3*time.Second, viewTasks) } else if msg.warning != "" { m.setFlash(msg.warning, 5*time.Second, viewTasks) - return m, m.fetchFixJobs() + return m, m.requestFetchFixJobs() } else { m.setFlash(fmt.Sprintf( "Fix job #%d enqueued", msg.job.ID, ), 3*time.Second, viewTasks) - return m, m.fetchFixJobs() + return m, m.requestFetchFixJobs() } return m, nil } @@ -970,9 +1042,11 @@ func (m model) handleApplyPatchResultMsg( "Patch for job #%d doesn't apply cleanly"+ " - triggering rebase", msg.jobID, ), 5*time.Second, viewTasks) - return m, tea.Batch( - m.triggerRebase(msg.jobID), m.fetchFixJobs(), - ) + cmds := []tea.Cmd{m.triggerRebase(msg.jobID)} + if cmd := m.requestFetchFixJobs(); cmd != nil { + cmds = append(cmds, cmd) + } + return m, tea.Batch(cmds...) } else if msg.commitFailed { detail := fmt.Sprintf( "Job #%d: %v", msg.jobID, msg.err, @@ -992,7 +1066,10 @@ func (m model) handleApplyPatchResultMsg( "Patch from job #%d applied and committed", msg.jobID, ), 3*time.Second, viewTasks) - cmds := []tea.Cmd{m.fetchFixJobs()} + cmds := []tea.Cmd{} + if cmd := m.requestFetchFixJobs(); cmd != nil { + cmds = append(cmds, cmd) + } if msg.parentJobID > 0 { cmds = append( cmds, diff --git a/cmd/roborev/tui/handlers_review.go b/cmd/roborev/tui/handlers_review.go index 9ba284d6..29a3324f 100644 --- a/cmd/roborev/tui/handlers_review.go +++ b/cmd/roborev/tui/handlers_review.go @@ -369,7 +369,7 @@ func (m model) handleToggleTasksKey() (tea.Model, tea.Cmd) { } if m.currentView == viewQueue { m.currentView = viewTasks - return m, m.fetchFixJobs() + return m, m.startFetchFixJobs() } return m, nil } diff --git a/cmd/roborev/tui/tui.go b/cmd/roborev/tui/tui.go index 89925fce..8e050067 100644 --- a/cmd/roborev/tui/tui.go +++ b/cmd/roborev/tui/tui.go @@ -285,6 +285,11 @@ type model struct { hasMore bool // true if there are more jobs to load loadingMore bool // true if currently loading more jobs (pagination) loadingJobs bool // true if currently loading jobs (full refresh) + loadingStatus bool // true if currently loading daemon status + loadingFixJobs bool // true if currently loading fix jobs + statusStale bool // true if a state change occurred while status fetch was in flight + fixJobsStale bool // true if a state change occurred while fix-jobs fetch was in flight + fetchGen uint64 // monotonic generation; incremented on reconnect to discard stale responses heightDetected bool // true after first WindowSizeMsg (real terminal height known) fetchSeq int // incremented on filter changes; stale fetch responses are discarded paginateNav viewKind // non-zero: auto-navigate in this view after pagination loads @@ -566,6 +571,7 @@ func newModel(ep daemon.DaemonEndpoint, opts ...option) model { width: 80, // sensible defaults until we get WindowSizeMsg height: 24, loadingJobs: true, // Init() calls fetchJobs, so mark as loading + loadingStatus: true, // Init() calls fetchStatus, so mark as loading hideClosed: hideClosed, activeRepoFilter: activeRepoFilter, activeBranchFilter: activeBranchFilter, @@ -793,6 +799,8 @@ func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { result, cmd = m.handleJobsErrMsg(msg) case paginationErrMsg: result, cmd = m.handlePaginationErrMsg(msg) + case statusErrMsg: + result, cmd = m.handleStatusErrMsg(msg) case errMsg: result, cmd = m.handleErrMsg(msg) case reconnectMsg: diff --git a/cmd/roborev/tui/tui_test.go b/cmd/roborev/tui/tui_test.go index 1f6682a5..6a04a4c4 100644 --- a/cmd/roborev/tui/tui_test.go +++ b/cmd/roborev/tui/tui_test.go @@ -463,6 +463,153 @@ func TestTUIJobsErrMsgClearsLoadingJobs(t *testing.T) { } } +func TestTUITickInFlightGuards(t *testing.T) { + tests := []struct { + name string + loadingStatus bool + loadingFixJobs bool + tasksEnabled bool + view viewKind + wantStatus bool // loadingStatus after tick + wantFixJobs bool // loadingFixJobs after tick + }{ + { + name: "skips status fetch when already loading", + loadingStatus: true, + tasksEnabled: false, + wantStatus: true, // stays true, no new fetch dispatched + wantFixJobs: false, + }, + { + name: "dispatches status fetch when idle", + loadingStatus: false, + tasksEnabled: false, + wantStatus: true, // set to true by dispatch + wantFixJobs: false, + }, + { + name: "skips fix-jobs fetch when already loading", + loadingFixJobs: true, + tasksEnabled: true, + view: viewTasks, + wantStatus: true, + wantFixJobs: true, // stays true, no new fetch dispatched + }, + { + name: "dispatches fix-jobs fetch when idle on tasks view", + loadingFixJobs: false, + tasksEnabled: true, + view: viewTasks, + wantStatus: true, + wantFixJobs: true, // set to true by dispatch + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert := assert.New(t) + m := newModel(testEndpoint, withExternalIODisabled()) + m.loadingJobs = false + m.loadingMore = false + m.loadingStatus = tt.loadingStatus + m.loadingFixJobs = tt.loadingFixJobs + m.tasksEnabled = tt.tasksEnabled + if tt.view != 0 { + m.currentView = tt.view + } + + m2, cmd := updateModel(t, m, tickMsg(time.Now())) + + assert.NotNil(cmd, "tick should always return commands") + assert.Equal(tt.wantStatus, m2.loadingStatus) + assert.Equal(tt.wantFixJobs, m2.loadingFixJobs) + }) + } +} + +func TestTUIStatusResponseClearsLoadingFlag(t *testing.T) { + assert := assert.New(t) + m := newModel(testEndpoint, withExternalIODisabled()) + m.loadingStatus = true + + m2, _ := updateModel(t, m, statusMsg{}) + assert.False(m2.loadingStatus, "statusMsg should clear loadingStatus") + + // Error path should also clear the flag. + m.loadingStatus = true + m3, _ := updateModel(t, m, statusErrMsg{err: fmt.Errorf("connection refused")}) + assert.False(m3.loadingStatus, "statusErrMsg should clear loadingStatus") +} + +func TestTUIFixJobsResponseClearsLoadingFlag(t *testing.T) { + assert := assert.New(t) + m := newModel(testEndpoint, withExternalIODisabled()) + m.loadingFixJobs = true + + m2, _ := updateModel(t, m, fixJobsMsg{jobs: []storage.ReviewJob{makeJob(1)}}) + assert.False(m2.loadingFixJobs, "fixJobsMsg should clear loadingFixJobs") + + // Error path should also clear the flag. + m.loadingFixJobs = true + m3, _ := updateModel(t, m, fixJobsMsg{err: fmt.Errorf("connection refused")}) + assert.False(m3.loadingFixJobs, "fixJobsMsg with error should clear loadingFixJobs") +} + +func TestTUIFixJobsStaleFlagTriggersFollowUp(t *testing.T) { + assert := assert.New(t) + m := newModel(testEndpoint, withExternalIODisabled()) + + // Simulate: a fetch is in flight and a mutation marks data as stale. + m.loadingFixJobs = true + m.fixJobsStale = true + + // When the in-flight fetch returns, it should dispatch a follow-up. + m2, cmd := updateModel(t, m, fixJobsMsg{jobs: []storage.ReviewJob{makeJob(1)}}) + assert.True(m2.loadingFixJobs, "should re-dispatch fetch when stale") + assert.False(m2.fixJobsStale, "stale flag should be cleared") + assert.NotNil(cmd, "should return a follow-up fetch command") + + // Without the stale flag, no follow-up. + m3, cmd := updateModel(t, m2, fixJobsMsg{jobs: []storage.ReviewJob{makeJob(1)}}) + assert.False(m3.loadingFixJobs, "should not re-dispatch without stale flag") + assert.Nil(cmd, "should return no command without stale flag") + + // Error path should also honor the stale flag. + m.loadingFixJobs = true + m.fixJobsStale = true + m4, cmd := updateModel(t, m, fixJobsMsg{err: fmt.Errorf("connection refused")}) + assert.True(m4.loadingFixJobs, "should re-dispatch on error when stale") + assert.False(m4.fixJobsStale, "stale flag should be cleared on error path") + assert.NotNil(cmd, "should return follow-up command on error path") +} + +func TestTUIStatusStaleFlagTriggersFollowUp(t *testing.T) { + assert := assert.New(t) + m := newModel(testEndpoint, withExternalIODisabled()) + + // Simulate: a fetch is in flight and an SSE event marks data as stale. + m.loadingStatus = true + m.statusStale = true + + // When the in-flight fetch returns, it should dispatch a follow-up. + m2, cmd := updateModel(t, m, statusMsg{}) + assert.True(m2.loadingStatus, "should re-dispatch fetch when stale") + assert.False(m2.statusStale, "stale flag should be cleared") + assert.NotNil(cmd, "should return a follow-up fetch command") + + // Without the stale flag, no follow-up. + m3, cmd := updateModel(t, m2, statusMsg{}) + assert.False(m3.loadingStatus, "should not re-dispatch without stale flag") + assert.Nil(cmd, "should return no command without stale flag") + + // Error path should also honor the stale flag. + m.loadingStatus = true + m.statusStale = true + m4, cmd := updateModel(t, m, statusErrMsg{err: fmt.Errorf("connection refused")}) + assert.True(m4.loadingStatus, "should re-dispatch on error when stale") + assert.False(m4.statusStale, "stale flag should be cleared on error path") + assert.NotNil(cmd, "should return follow-up command on error path") +} + func TestTUIHideClosedMalformedConfigNotOverwritten(t *testing.T) { tmpDir := setupTuiTestEnv(t) @@ -630,9 +777,9 @@ func TestTUIVersionMismatchDetection(t *testing.T) { m := newModel(testEndpoint, withExternalIODisabled()) // Simulate receiving status with different version - status := statusMsg(storage.DaemonStatus{ + status := statusMsg{status: storage.DaemonStatus{ Version: "different-version", - }) + }} m2, _ := updateModel(t, m, status) @@ -652,9 +799,9 @@ func TestTUIVersionMismatchDetection(t *testing.T) { m := newModel(testEndpoint, withExternalIODisabled()) // Simulate receiving status with same version as TUI - status := statusMsg(storage.DaemonStatus{ + status := statusMsg{status: storage.DaemonStatus{ Version: version.Version, - }) + }} m2, _ := updateModel(t, m, status) @@ -769,10 +916,10 @@ func TestTUIConfigReloadFlash(t *testing.T) { t.Run("no flash on first status fetch", func(t *testing.T) { // First status fetch with a ConfigReloadCounter should NOT flash - status1 := statusMsg(storage.DaemonStatus{ + status1 := statusMsg{status: storage.DaemonStatus{ Version: "1.0.0", ConfigReloadCounter: 1, - }) + }} m2, _ := updateModel(t, m, status1) @@ -800,10 +947,10 @@ func TestTUIConfigReloadFlash(t *testing.T) { m.lastConfigReloadCounter = 1 // Second status with different ConfigReloadCounter should flash - status2 := statusMsg(storage.DaemonStatus{ + status2 := statusMsg{status: storage.DaemonStatus{ Version: "1.0.0", ConfigReloadCounter: 2, - }) + }} m2, _ := updateModel(t, m, status2) @@ -826,10 +973,10 @@ func TestTUIConfigReloadFlash(t *testing.T) { m.lastConfigReloadCounter = 0 // No reload had occurred // Now config is reloaded - status := statusMsg(storage.DaemonStatus{ + status := statusMsg{status: storage.DaemonStatus{ Version: "1.0.0", ConfigReloadCounter: 1, - }) + }} m2, _ := updateModel(t, m, status) @@ -846,10 +993,10 @@ func TestTUIConfigReloadFlash(t *testing.T) { m.lastConfigReloadCounter = 1 // Same counter - status := statusMsg(storage.DaemonStatus{ + status := statusMsg{status: storage.DaemonStatus{ Version: "1.0.0", ConfigReloadCounter: 1, - }) + }} m2, _ := updateModel(t, m, status) @@ -946,7 +1093,7 @@ func TestTUIReconnectOnConsecutiveErrors(t *testing.T) { { name: "resets error count on successful status fetch", initialErrors: 5, - msg: statusMsg(storage.DaemonStatus{Version: "1.0.0"}), + msg: statusMsg{status: storage.DaemonStatus{Version: "1.0.0"}}, wantErrors: 0, wantReconnecting: false, wantCmd: false, diff --git a/cmd/roborev/tui/types.go b/cmd/roborev/tui/types.go index bc82a5ae..b2932ec7 100644 --- a/cmd/roborev/tui/types.go +++ b/cmd/roborev/tui/types.go @@ -127,7 +127,10 @@ type jobsMsg struct { seq int // fetch sequence number — stale responses (seq < model.fetchSeq) are discarded stats storage.JobStats // aggregate counts from server } -type statusMsg storage.DaemonStatus +type statusMsg struct { + status storage.DaemonStatus + gen uint64 // fetch generation — discard if < model.fetchGen +} type reviewMsg struct { review *storage.Review responses []storage.Response // Responses for this review @@ -167,6 +170,10 @@ type rerunResultMsg struct { err error } type errMsg error +type statusErrMsg struct { + err error + gen uint64 +} type configSaveErrMsg struct{ err error } type jobsErrMsg struct { err error @@ -224,6 +231,7 @@ type reconnectMsg struct { type fixJobsMsg struct { jobs []storage.ReviewJob err error + gen uint64 // fetch generation — discard if < model.fetchGen } type fixTriggerResultMsg struct {