Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions cmd/roborev/tui/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,10 @@ func (m model) displayTick() tea.Cmd {
})
}

// tickInterval returns the appropriate polling interval based on queue activity.
// Uses faster polling when jobs are running or pending, slower when idle.
// tickInterval returns the polling interval. Now that SSE handles real-time
// updates, polling is only a fallback for missed events or disconnections.
func (m model) tickInterval() time.Duration {
// Before first status fetch, use active interval to be responsive on startup
if !m.statusFetchedOnce {
return tickIntervalActive
}
// Poll frequently when there's activity
if m.status.RunningJobs > 0 || m.status.QueuedJobs > 0 {
return tickIntervalActive
}
return tickIntervalIdle
return tickIntervalFallback
}

type jobsPageResult struct {
Expand Down
92 changes: 73 additions & 19 deletions cmd/roborev/tui/handlers_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (m model) handleJobsMsg(msg jobsMsg) (tea.Model, tea.Cmd) {
m.paginateNav = 0
}

return m, nil
return m, m.consumeSSEPendingRefresh()
}

// handleStatusMsg processes daemon status updates.
Expand Down Expand Up @@ -707,17 +707,53 @@ func (m model) handleSavePatchResultMsg(msg savePatchResultMsg) (tea.Model, tea.
return m, nil
}

// handleSSEEventMsg processes real-time events from the daemon's NDJSON stream.
// Triggers an immediate data refresh and re-subscribes for the next event.
// If a fetch is already in flight, sets a pending flag so the refresh runs
// after the current load completes (avoiding stale data).
func (m model) handleSSEEventMsg() (tea.Model, tea.Cmd) {
if m.loadingMore || m.loadingJobs {
m.ssePendingRefresh = true
return m, waitForSSE(m.sseCh, m.sseStop)
}
m.loadingJobs = true
cmds := []tea.Cmd{
m.fetchJobs(),
m.fetchStatus(),
waitForSSE(m.sseCh, m.sseStop),
}
if m.tasksWorkflowEnabled() && (m.currentView == viewTasks || m.hasActiveFixJobs()) {
cmds = append(cmds, m.fetchFixJobs())
}
return m, tea.Batch(cmds...)
}

// consumeSSEPendingRefresh returns the full SSE refresh command set if
// an event arrived while a fetch was in flight, then clears the flag.
// Returns nil if no refresh is pending.
func (m *model) consumeSSEPendingRefresh() tea.Cmd {
if !m.ssePendingRefresh {
return nil
}
m.ssePendingRefresh = false
m.loadingJobs = true
cmds := []tea.Cmd{m.fetchJobs(), m.fetchStatus()}
if m.tasksWorkflowEnabled() && (m.currentView == viewTasks || m.hasActiveFixJobs()) {
cmds = append(cmds, m.fetchFixJobs())
}
return tea.Batch(cmds...)
}

// handleReconnectMsg processes daemon reconnection attempts.
func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) {
m.reconnecting = false
if msg.err == nil && msg.endpoint != m.endpoint {
if msg.err != nil {
return m, nil
}

if msg.endpoint != m.endpoint {
m.endpoint = msg.endpoint
m.client = msg.endpoint.HTTPClient(10 * time.Second)
m.consecutiveErrors = 0
m.err = nil
if msg.version != "" {
m.daemonVersion = msg.version
}
// Update runtime metadata so external tools see the
// new daemon address after reconnect.
if m.controlSocket != "" {
Expand All @@ -731,17 +767,35 @@ func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) {
)
}
}
m.clearFetchFailed()
m.loadingJobs = true
cmds := []tea.Cmd{
m.fetchJobs(), m.fetchStatus(), m.fetchRepoNames(),
}
if cmd := m.fetchUnloadedBranches(); cmd != nil {
cmds = append(cmds, cmd)
}
return m, tea.Batch(cmds...)
}
return m, nil

// Restart SSE subscription on any successful reconnect, not just
// endpoint changes. The old goroutine may be stuck in backoff
// after a same-address daemon restart.
if m.sseStop != nil {
close(m.sseStop)
m.sseCh = make(chan struct{}, 1)
m.sseStop = make(chan struct{})
go startSSESubscription(m.endpoint, m.sseCh, m.sseStop)
}

m.consecutiveErrors = 0
m.err = nil
if msg.version != "" {
m.daemonVersion = msg.version
}
m.clearFetchFailed()
m.loadingJobs = true
cmds := []tea.Cmd{
m.fetchJobs(), m.fetchStatus(), m.fetchRepoNames(),
}
if cmd := m.fetchUnloadedBranches(); cmd != nil {
cmds = append(cmds, cmd)
}
if m.sseCh != nil {
cmds = append(cmds, waitForSSE(m.sseCh, m.sseStop))
}
return m, tea.Batch(cmds...)
}

// handleWindowSizeMsg processes terminal resize events.
Expand Down Expand Up @@ -826,7 +880,7 @@ func (m model) handleJobsErrMsg(
if cmd := m.handleConnectionError(msg.err); cmd != nil {
return m, cmd
}
return m, nil
return m, m.consumeSSEPendingRefresh()
}

// handlePaginationErrMsg processes pagination fetch errors.
Expand All @@ -844,7 +898,7 @@ func (m model) handlePaginationErrMsg(
if cmd := m.handleConnectionError(msg.err); cmd != nil {
return m, cmd
}
return m, nil
return m, m.consumeSSEPendingRefresh()
}

// handleErrMsg processes generic error messages.
Expand Down
123 changes: 123 additions & 0 deletions cmd/roborev/tui/sse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package tui

import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

tea "github.com/charmbracelet/bubbletea"
"github.com/roborev-dev/roborev/internal/daemon"
)

// sseEventMsg signals that the daemon broadcast an event.
// The TUI uses this to trigger an immediate data refresh.
type sseEventMsg struct{}

// startSSESubscription maintains a persistent NDJSON connection to the
// daemon's /api/stream/events endpoint. On each received event it sends
// a non-blocking signal to sseCh. The goroutine reconnects with
// exponential backoff on errors and exits when stopCh is closed.
func startSSESubscription(
endpoint daemon.DaemonEndpoint,
sseCh chan<- struct{},
stopCh <-chan struct{},
) {
const maxBackoff = 30 * time.Second
backoff := time.Second

for {
connected, err := sseReadLoop(endpoint, sseCh, stopCh)
if err == nil {
return
}

// Reset backoff after a connection that successfully read events,
// since the next failure is likely a fresh problem (daemon restart).
if connected {
backoff = time.Second
}

select {
case <-stopCh:
return
case <-time.After(backoff):
}

backoff = min(backoff*2, maxBackoff)
}
}

// sseReadLoop connects to the event stream and reads NDJSON lines until
// the connection drops or stopCh fires. Returns (false, nil) when stopCh
// is closed, (connected, err) on connection/decode failure. connected is
// true if at least one event was successfully read.
func sseReadLoop(
endpoint daemon.DaemonEndpoint,
sseCh chan<- struct{},
stopCh <-chan struct{},
) (connected bool, err error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()

client := endpoint.HTTPClient(0)
req, err := http.NewRequestWithContext(
ctx, http.MethodGet,
endpoint.BaseURL()+"/api/stream/events", nil,
)
if err != nil {
return false, err
}
resp, err := client.Do(req)
if err != nil {
return false, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return false, fmt.Errorf("stream events: %s", resp.Status)
}

decoder := json.NewDecoder(resp.Body)
for {
var event daemon.Event
if err := decoder.Decode(&event); err != nil {
select {
case <-stopCh:
return connected, nil
default:
return connected, err
}
}
connected = true

select {
case sseCh <- struct{}{}:
default:
}
}
}

// waitForSSE returns a tea.Cmd that blocks until a signal arrives on
// sseCh or stopCh is closed, then delivers an sseEventMsg (or nil on
// stop) to the Bubbletea event loop. Accepting stopCh avoids the need
// to close sseCh on reconnect, which would race with the producer goroutine.
func waitForSSE(sseCh <-chan struct{}, stopCh <-chan struct{}) tea.Cmd {
return func() tea.Msg {
select {
case <-sseCh:
return sseEventMsg{}
case <-stopCh:
return nil
}
}
}
Loading
Loading