Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/roborev/tui/control_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (m model) handleCtrlSetView(
m.currentView = v
var cmd tea.Cmd
if v == viewTasks {
cmd = m.fetchFixJobs()
cmd = m.startFetchFixJobs()
}
return m, controlResponse{OK: true}, cmd
}
Expand Down
56 changes: 52 additions & 4 deletions cmd/roborev/tui/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,37 @@ func (m model) fetchMoreJobs() tea.Cmd {
}

func (m model) fetchStatus() tea.Cmd {
gen := m.fetchGen
return func() tea.Msg {
var status storage.DaemonStatus
if err := m.getJSON("/api/status", &status); err != nil {
return errMsg(err)
return statusErrMsg{err: err, gen: gen}
}
return statusMsg(status)
return statusMsg{status: status, gen: gen}
}
}

// startFetchStatus dispatches fetchStatus if no status fetch is already
// in flight, and sets the loadingStatus flag. Returns nil if skipped.
func (m *model) startFetchStatus() tea.Cmd {
if m.loadingStatus {
return nil
}
m.loadingStatus = true
return m.fetchStatus()
}

// requestFetchStatus is like startFetchStatus but for paths triggered by
// daemon state changes (SSE events). If a fetch is already in flight, it
// marks the current data as stale so handleStatusMsg will dispatch a
// follow-up fetch when the in-flight one returns.
func (m *model) requestFetchStatus() tea.Cmd {
if m.loadingStatus {
m.statusStale = true
return nil
}
m.loadingStatus = true
return m.fetchStatus()
}

func (m model) checkForUpdate() tea.Cmd {
Expand Down Expand Up @@ -808,15 +832,39 @@ func (m model) fetchPatch(jobID int64) tea.Cmd {

// fetchFixJobs fetches fix jobs from the daemon.
func (m model) fetchFixJobs() tea.Cmd {
gen := m.fetchGen
return func() tea.Msg {
params := neturl.Values{}
params.Set("job_type", "fix")
params.Set("limit", "200")

result, err := m.loadJobsPage(params)
if err != nil {
return fixJobsMsg{err: err}
return fixJobsMsg{err: err, gen: gen}
}
return fixJobsMsg{jobs: result.Jobs}
return fixJobsMsg{jobs: result.Jobs, gen: gen}
}
}

// startFetchFixJobs dispatches fetchFixJobs if no fix-jobs fetch is already
// in flight, and sets the loadingFixJobs flag. Returns nil if skipped.
func (m *model) startFetchFixJobs() tea.Cmd {
if m.loadingFixJobs {
return nil
}
m.loadingFixJobs = true
return m.fetchFixJobs()
}

// requestFetchFixJobs is like startFetchFixJobs but for handlers that follow
// state-mutating operations (fix enqueue, patch apply). If a fetch is already
// in flight, it marks the current data as stale so handleFixJobsMsg will
// dispatch a follow-up fetch when the in-flight one returns.
func (m *model) requestFetchFixJobs() tea.Cmd {
if m.loadingFixJobs {
m.fixJobsStale = true
return nil
}
m.loadingFixJobs = true
return m.fetchFixJobs()
}
109 changes: 93 additions & 16 deletions cmd/roborev/tui/handlers_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@ func (m model) handleJobsMsg(msg jobsMsg) (tea.Model, tea.Cmd) {

// handleStatusMsg processes daemon status updates.
func (m model) handleStatusMsg(msg statusMsg) (tea.Model, tea.Cmd) {
m.status = storage.DaemonStatus(msg)
if msg.gen < m.fetchGen {
return m, nil // discard pre-reconnect response
}
m.loadingStatus = false
m.status = msg.status
m.consecutiveErrors = 0
if m.status.Version != "" {
m.daemonVersion = m.status.Version
Expand All @@ -239,6 +243,11 @@ func (m model) handleStatusMsg(msg statusMsg) (tea.Model, tea.Cmd) {
}
m.lastConfigReloadCounter = m.status.ConfigReloadCounter
m.statusFetchedOnce = true
if m.statusStale {
m.statusStale = false
m.loadingStatus = true
return m, m.fetchStatus()
}
return m, nil
}

Expand Down Expand Up @@ -435,6 +444,10 @@ func (m model) handleBranchesMsg(
func (m model) handleFixJobsMsg(
msg fixJobsMsg,
) (tea.Model, tea.Cmd) {
if msg.gen < m.fetchGen {
return m, nil // discard pre-reconnect response
}
m.loadingFixJobs = false
if msg.err != nil {
m.err = msg.err
} else {
Expand All @@ -445,6 +458,14 @@ func (m model) handleFixJobsMsg(
m.fixSelectedIdx = len(m.fixJobs) - 1
}
}
// A state-mutating handler requested a refresh while this fetch was
// in flight. The data we just received predates that mutation, so
// dispatch a follow-up fetch to pick up the latest state.
if m.fixJobsStale {
m.fixJobsStale = false
m.loadingFixJobs = true
return m, m.fetchFixJobs()
}
return m, nil
}

Expand Down Expand Up @@ -724,11 +745,17 @@ func (m model) handleSSEEventMsg() (tea.Model, tea.Cmd) {
m.loadingJobs = true
cmds := []tea.Cmd{
m.fetchJobs(),
m.fetchStatus(),
waitForSSE(m.sseCh, m.sseStop),
}
// SSE events signal daemon state changes — use the stale-aware
// helpers so a skipped fetch gets retried after the in-flight one.
if cmd := m.requestFetchStatus(); cmd != nil {
cmds = append(cmds, cmd)
}
if m.tasksWorkflowEnabled() && (m.currentView == viewTasks || m.hasActiveFixJobs()) {
cmds = append(cmds, m.fetchFixJobs())
if cmd := m.requestFetchFixJobs(); cmd != nil {
cmds = append(cmds, cmd)
}
}
return m, tea.Batch(cmds...)
}
Expand All @@ -742,9 +769,14 @@ func (m *model) consumeSSEPendingRefresh() tea.Cmd {
}
m.ssePendingRefresh = false
m.loadingJobs = true
cmds := []tea.Cmd{m.fetchJobs(), m.fetchStatus()}
cmds := []tea.Cmd{m.fetchJobs()}
if cmd := m.requestFetchStatus(); cmd != nil {
cmds = append(cmds, cmd)
}
if m.tasksWorkflowEnabled() && (m.currentView == viewTasks || m.hasActiveFixJobs()) {
cmds = append(cmds, m.fetchFixJobs())
if cmd := m.requestFetchFixJobs(); cmd != nil {
cmds = append(cmds, cmd)
}
}
return tea.Batch(cmds...)
}
Expand Down Expand Up @@ -790,9 +822,21 @@ func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) {
m.daemonVersion = msg.version
}
m.clearFetchFailed()
m.fetchGen++ // invalidate pre-reconnect status/fix-jobs responses
m.fetchSeq++ // invalidate pre-reconnect jobs responses
m.loadingJobs = true
cmds := []tea.Cmd{
m.fetchJobs(), m.fetchStatus(), m.fetchRepoNames(),
m.loadingMore = false
cmds := []tea.Cmd{m.fetchJobs(), m.fetchRepoNames()}
// Force fetches on reconnect — previous in-flight requests
// were against the old connection and will fail or be stale.
m.loadingStatus = true
m.statusStale = false
cmds = append(cmds, m.fetchStatus())
m.loadingFixJobs = false
m.fixJobsStale = false
if m.tasksWorkflowEnabled() {
m.loadingFixJobs = true
cmds = append(cmds, m.fetchFixJobs())
}
if cmd := m.fetchUnloadedBranches(); cmd != nil {
cmds = append(cmds, cmd)
Expand Down Expand Up @@ -843,11 +887,20 @@ func (m model) handleTickMsg(
) (tea.Model, tea.Cmd) {
// Skip job refresh while pagination or another refresh is in flight
if m.loadingMore || m.loadingJobs {
return m, tea.Batch(m.tick(), m.fetchStatus())
cmds := []tea.Cmd{m.tick()}
if cmd := m.startFetchStatus(); cmd != nil {
cmds = append(cmds, cmd)
}
return m, tea.Batch(cmds...)
}
cmds := []tea.Cmd{m.tick(), m.fetchJobs()}
if cmd := m.startFetchStatus(); cmd != nil {
cmds = append(cmds, cmd)
}
cmds := []tea.Cmd{m.tick(), m.fetchJobs(), m.fetchStatus()}
if m.tasksWorkflowEnabled() && (m.currentView == viewTasks || m.hasActiveFixJobs()) {
cmds = append(cmds, m.fetchFixJobs())
if cmd := m.startFetchFixJobs(); cmd != nil {
cmds = append(cmds, cmd)
}
}
return m, tea.Batch(cmds...)
}
Expand Down Expand Up @@ -907,6 +960,25 @@ func (m model) handlePaginationErrMsg(
}

// handleErrMsg processes generic error messages.
func (m model) handleStatusErrMsg(
msg statusErrMsg,
) (tea.Model, tea.Cmd) {
if msg.gen < m.fetchGen {
return m, nil // discard pre-reconnect error
}
m.loadingStatus = false
m.err = msg.err
if m.statusStale {
m.statusStale = false
m.loadingStatus = true
return m, m.fetchStatus()
}
if cmd := m.handleConnectionError(msg.err); cmd != nil {
return m, cmd
}
return m, nil
}

func (m model) handleErrMsg(
msg errMsg,
) (tea.Model, tea.Cmd) {
Expand All @@ -928,12 +1000,12 @@ func (m model) handleFixTriggerResultMsg(
), 3*time.Second, viewTasks)
} else if msg.warning != "" {
m.setFlash(msg.warning, 5*time.Second, viewTasks)
return m, m.fetchFixJobs()
return m, m.requestFetchFixJobs()
} else {
m.setFlash(fmt.Sprintf(
"Fix job #%d enqueued", msg.job.ID,
), 3*time.Second, viewTasks)
return m, m.fetchFixJobs()
return m, m.requestFetchFixJobs()
}
return m, nil
}
Expand Down Expand Up @@ -970,9 +1042,11 @@ func (m model) handleApplyPatchResultMsg(
"Patch for job #%d doesn't apply cleanly"+
" - triggering rebase", msg.jobID,
), 5*time.Second, viewTasks)
return m, tea.Batch(
m.triggerRebase(msg.jobID), m.fetchFixJobs(),
)
cmds := []tea.Cmd{m.triggerRebase(msg.jobID)}
if cmd := m.requestFetchFixJobs(); cmd != nil {
cmds = append(cmds, cmd)
}
return m, tea.Batch(cmds...)
} else if msg.commitFailed {
detail := fmt.Sprintf(
"Job #%d: %v", msg.jobID, msg.err,
Expand All @@ -992,7 +1066,10 @@ func (m model) handleApplyPatchResultMsg(
"Patch from job #%d applied and committed",
msg.jobID,
), 3*time.Second, viewTasks)
cmds := []tea.Cmd{m.fetchFixJobs()}
cmds := []tea.Cmd{}
if cmd := m.requestFetchFixJobs(); cmd != nil {
cmds = append(cmds, cmd)
}
if msg.parentJobID > 0 {
cmds = append(
cmds,
Expand Down
2 changes: 1 addition & 1 deletion cmd/roborev/tui/handlers_review.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (m model) handleToggleTasksKey() (tea.Model, tea.Cmd) {
}
if m.currentView == viewQueue {
m.currentView = viewTasks
return m, m.fetchFixJobs()
return m, m.startFetchFixJobs()
}
return m, nil
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/roborev/tui/tui.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ type model struct {
hasMore bool // true if there are more jobs to load
loadingMore bool // true if currently loading more jobs (pagination)
loadingJobs bool // true if currently loading jobs (full refresh)
loadingStatus bool // true if currently loading daemon status
loadingFixJobs bool // true if currently loading fix jobs
statusStale bool // true if a state change occurred while status fetch was in flight
fixJobsStale bool // true if a state change occurred while fix-jobs fetch was in flight
fetchGen uint64 // monotonic generation; incremented on reconnect to discard stale responses
heightDetected bool // true after first WindowSizeMsg (real terminal height known)
fetchSeq int // incremented on filter changes; stale fetch responses are discarded
paginateNav viewKind // non-zero: auto-navigate in this view after pagination loads
Expand Down Expand Up @@ -566,6 +571,7 @@ func newModel(ep daemon.DaemonEndpoint, opts ...option) model {
width: 80, // sensible defaults until we get WindowSizeMsg
height: 24,
loadingJobs: true, // Init() calls fetchJobs, so mark as loading
loadingStatus: true, // Init() calls fetchStatus, so mark as loading
hideClosed: hideClosed,
activeRepoFilter: activeRepoFilter,
activeBranchFilter: activeBranchFilter,
Expand Down Expand Up @@ -793,6 +799,8 @@ func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
result, cmd = m.handleJobsErrMsg(msg)
case paginationErrMsg:
result, cmd = m.handlePaginationErrMsg(msg)
case statusErrMsg:
result, cmd = m.handleStatusErrMsg(msg)
case errMsg:
result, cmd = m.handleErrMsg(msg)
case reconnectMsg:
Expand Down
Loading
Loading