Skip to content

Commit d50e2c5

Browse files
cpcloudclaudewesm
authored
feat(tui): subscribe to daemon event stream for instant updates (#593)
## Summary - The TUI now subscribes to the daemon's NDJSON event stream (`/api/stream/events`) via a background goroutine, triggering immediate data refreshes instead of waiting for the 2-10s poll cycle - The daemon broadcasts `review.closed`, `review.reopened`, and `job.enqueued` events from the corresponding API handlers — previously only worker lifecycle events (`review.started/completed/failed/canceled`) were broadcast - Polling is retained as a 15s fallback (was adaptive 2s/10s) for robustness against SSE connection drops, with the SSE goroutine handling reconnection with exponential backoff - The SSE subscription restarts on any successful daemon reconnect (not just endpoint changes), handles in-flight fetch coalescing via a pending-refresh flag, and validates HTTP status before decoding the stream 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Wes McKinney <wesmckinn+git@gmail.com>
1 parent 6e3a92b commit d50e2c5

8 files changed

Lines changed: 618 additions & 88 deletions

File tree

cmd/roborev/tui/fetch.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,10 @@ func (m model) displayTick() tea.Cmd {
3333
})
3434
}
3535

36-
// tickInterval returns the appropriate polling interval based on queue activity.
37-
// Uses faster polling when jobs are running or pending, slower when idle.
36+
// tickInterval returns the polling interval. Now that SSE handles real-time
37+
// updates, polling is only a fallback for missed events or disconnections.
3838
func (m model) tickInterval() time.Duration {
39-
// Before first status fetch, use active interval to be responsive on startup
40-
if !m.statusFetchedOnce {
41-
return tickIntervalActive
42-
}
43-
// Poll frequently when there's activity
44-
if m.status.RunningJobs > 0 || m.status.QueuedJobs > 0 {
45-
return tickIntervalActive
46-
}
47-
return tickIntervalIdle
39+
return tickIntervalFallback
4840
}
4941

5042
type jobsPageResult struct {

cmd/roborev/tui/handlers_msg.go

Lines changed: 73 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ func (m model) handleJobsMsg(msg jobsMsg) (tea.Model, tea.Cmd) {
218218
m.paginateNav = 0
219219
}
220220

221-
return m, nil
221+
return m, m.consumeSSEPendingRefresh()
222222
}
223223

224224
// handleStatusMsg processes daemon status updates.
@@ -707,17 +707,53 @@ func (m model) handleSavePatchResultMsg(msg savePatchResultMsg) (tea.Model, tea.
707707
return m, nil
708708
}
709709

710+
// handleSSEEventMsg processes real-time events from the daemon's NDJSON stream.
711+
// Triggers an immediate data refresh and re-subscribes for the next event.
712+
// If a fetch is already in flight, sets a pending flag so the refresh runs
713+
// after the current load completes (avoiding stale data).
714+
func (m model) handleSSEEventMsg() (tea.Model, tea.Cmd) {
715+
if m.loadingMore || m.loadingJobs {
716+
m.ssePendingRefresh = true
717+
return m, waitForSSE(m.sseCh, m.sseStop)
718+
}
719+
m.loadingJobs = true
720+
cmds := []tea.Cmd{
721+
m.fetchJobs(),
722+
m.fetchStatus(),
723+
waitForSSE(m.sseCh, m.sseStop),
724+
}
725+
if m.tasksWorkflowEnabled() && (m.currentView == viewTasks || m.hasActiveFixJobs()) {
726+
cmds = append(cmds, m.fetchFixJobs())
727+
}
728+
return m, tea.Batch(cmds...)
729+
}
730+
731+
// consumeSSEPendingRefresh returns the full SSE refresh command set if
732+
// an event arrived while a fetch was in flight, then clears the flag.
733+
// Returns nil if no refresh is pending.
734+
func (m *model) consumeSSEPendingRefresh() tea.Cmd {
735+
if !m.ssePendingRefresh {
736+
return nil
737+
}
738+
m.ssePendingRefresh = false
739+
m.loadingJobs = true
740+
cmds := []tea.Cmd{m.fetchJobs(), m.fetchStatus()}
741+
if m.tasksWorkflowEnabled() && (m.currentView == viewTasks || m.hasActiveFixJobs()) {
742+
cmds = append(cmds, m.fetchFixJobs())
743+
}
744+
return tea.Batch(cmds...)
745+
}
746+
710747
// handleReconnectMsg processes daemon reconnection attempts.
711748
func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) {
712749
m.reconnecting = false
713-
if msg.err == nil && msg.endpoint != m.endpoint {
750+
if msg.err != nil {
751+
return m, nil
752+
}
753+
754+
if msg.endpoint != m.endpoint {
714755
m.endpoint = msg.endpoint
715756
m.client = msg.endpoint.HTTPClient(10 * time.Second)
716-
m.consecutiveErrors = 0
717-
m.err = nil
718-
if msg.version != "" {
719-
m.daemonVersion = msg.version
720-
}
721757
// Update runtime metadata so external tools see the
722758
// new daemon address after reconnect.
723759
if m.controlSocket != "" {
@@ -731,17 +767,35 @@ func (m model) handleReconnectMsg(msg reconnectMsg) (tea.Model, tea.Cmd) {
731767
)
732768
}
733769
}
734-
m.clearFetchFailed()
735-
m.loadingJobs = true
736-
cmds := []tea.Cmd{
737-
m.fetchJobs(), m.fetchStatus(), m.fetchRepoNames(),
738-
}
739-
if cmd := m.fetchUnloadedBranches(); cmd != nil {
740-
cmds = append(cmds, cmd)
741-
}
742-
return m, tea.Batch(cmds...)
743770
}
744-
return m, nil
771+
772+
// Restart SSE subscription on any successful reconnect, not just
773+
// endpoint changes. The old goroutine may be stuck in backoff
774+
// after a same-address daemon restart.
775+
if m.sseStop != nil {
776+
close(m.sseStop)
777+
m.sseCh = make(chan struct{}, 1)
778+
m.sseStop = make(chan struct{})
779+
go startSSESubscription(m.endpoint, m.sseCh, m.sseStop)
780+
}
781+
782+
m.consecutiveErrors = 0
783+
m.err = nil
784+
if msg.version != "" {
785+
m.daemonVersion = msg.version
786+
}
787+
m.clearFetchFailed()
788+
m.loadingJobs = true
789+
cmds := []tea.Cmd{
790+
m.fetchJobs(), m.fetchStatus(), m.fetchRepoNames(),
791+
}
792+
if cmd := m.fetchUnloadedBranches(); cmd != nil {
793+
cmds = append(cmds, cmd)
794+
}
795+
if m.sseCh != nil {
796+
cmds = append(cmds, waitForSSE(m.sseCh, m.sseStop))
797+
}
798+
return m, tea.Batch(cmds...)
745799
}
746800

747801
// handleWindowSizeMsg processes terminal resize events.
@@ -826,7 +880,7 @@ func (m model) handleJobsErrMsg(
826880
if cmd := m.handleConnectionError(msg.err); cmd != nil {
827881
return m, cmd
828882
}
829-
return m, nil
883+
return m, m.consumeSSEPendingRefresh()
830884
}
831885

832886
// handlePaginationErrMsg processes pagination fetch errors.
@@ -844,7 +898,7 @@ func (m model) handlePaginationErrMsg(
844898
if cmd := m.handleConnectionError(msg.err); cmd != nil {
845899
return m, cmd
846900
}
847-
return m, nil
901+
return m, m.consumeSSEPendingRefresh()
848902
}
849903

850904
// handleErrMsg processes generic error messages.

cmd/roborev/tui/sse.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package tui
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"net/http"
8+
"time"
9+
10+
tea "github.com/charmbracelet/bubbletea"
11+
"github.com/roborev-dev/roborev/internal/daemon"
12+
)
13+
14+
// sseEventMsg signals that the daemon broadcast an event.
15+
// The TUI uses this to trigger an immediate data refresh.
16+
type sseEventMsg struct{}
17+
18+
// startSSESubscription maintains a persistent NDJSON connection to the
19+
// daemon's /api/stream/events endpoint. On each received event it sends
20+
// a non-blocking signal to sseCh. The goroutine reconnects with
21+
// exponential backoff on errors and exits when stopCh is closed.
22+
func startSSESubscription(
23+
endpoint daemon.DaemonEndpoint,
24+
sseCh chan<- struct{},
25+
stopCh <-chan struct{},
26+
) {
27+
const maxBackoff = 30 * time.Second
28+
backoff := time.Second
29+
30+
for {
31+
connected, err := sseReadLoop(endpoint, sseCh, stopCh)
32+
if err == nil {
33+
return
34+
}
35+
36+
// Reset backoff after a connection that successfully read events,
37+
// since the next failure is likely a fresh problem (daemon restart).
38+
if connected {
39+
backoff = time.Second
40+
}
41+
42+
select {
43+
case <-stopCh:
44+
return
45+
case <-time.After(backoff):
46+
}
47+
48+
backoff = min(backoff*2, maxBackoff)
49+
}
50+
}
51+
52+
// sseReadLoop connects to the event stream and reads NDJSON lines until
53+
// the connection drops or stopCh fires. Returns (false, nil) when stopCh
54+
// is closed, (connected, err) on connection/decode failure. connected is
55+
// true if at least one event was successfully read.
56+
func sseReadLoop(
57+
endpoint daemon.DaemonEndpoint,
58+
sseCh chan<- struct{},
59+
stopCh <-chan struct{},
60+
) (connected bool, err error) {
61+
ctx, cancel := context.WithCancel(context.Background())
62+
defer cancel()
63+
64+
go func() {
65+
select {
66+
case <-stopCh:
67+
cancel()
68+
case <-ctx.Done():
69+
}
70+
}()
71+
72+
client := endpoint.HTTPClient(0)
73+
req, err := http.NewRequestWithContext(
74+
ctx, http.MethodGet,
75+
endpoint.BaseURL()+"/api/stream/events", nil,
76+
)
77+
if err != nil {
78+
return false, err
79+
}
80+
resp, err := client.Do(req)
81+
if err != nil {
82+
return false, err
83+
}
84+
defer resp.Body.Close()
85+
86+
if resp.StatusCode != http.StatusOK {
87+
return false, fmt.Errorf("stream events: %s", resp.Status)
88+
}
89+
90+
decoder := json.NewDecoder(resp.Body)
91+
for {
92+
var event daemon.Event
93+
if err := decoder.Decode(&event); err != nil {
94+
select {
95+
case <-stopCh:
96+
return connected, nil
97+
default:
98+
return connected, err
99+
}
100+
}
101+
connected = true
102+
103+
select {
104+
case sseCh <- struct{}{}:
105+
default:
106+
}
107+
}
108+
}
109+
110+
// waitForSSE returns a tea.Cmd that blocks until a signal arrives on
111+
// sseCh or stopCh is closed, then delivers an sseEventMsg (or nil on
112+
// stop) to the Bubbletea event loop. Accepting stopCh avoids the need
113+
// to close sseCh on reconnect, which would race with the producer goroutine.
114+
func waitForSSE(sseCh <-chan struct{}, stopCh <-chan struct{}) tea.Cmd {
115+
return func() tea.Msg {
116+
select {
117+
case <-sseCh:
118+
return sseEventMsg{}
119+
case <-stopCh:
120+
return nil
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)