Skip to content

Commit 348243a

Browse files
authored
[kernel-1116] CDP monitor core (#214)
Second layer of the CDP monitor split. Adds the full Monitor struct, all lifecycle machinery, and the test infrastructure. dispatchEvent is a no-op stub - event handlers land in #213 <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Adds substantial new WebSocket lifecycle/reconnect and command routing logic with multiple goroutines/locks; regressions could impact capture stability and leak resources. Tests mitigate but concurrency and reconnect edge cases remain moderately risky. > > **Overview** > Implements the core `cdpmonitor.Monitor` lifecycle: establishes a CDP WebSocket connection, routes command responses via `send()`, starts the init sequence (`Target.setAutoAttach` + attach to existing targets), and adds a `Health()` snapshot for operational visibility. > > Adds automatic reconnect on upstream URL changes with backoff, state reset/unblocking of in-flight commands, and lifecycle events (`monitor_disconnected`, `monitor_reconnected`, `monitor_reconnect_failed`, `monitor_init_failed`). > > Introduces screenshot capture/publishing with rate limiting and optional ffmpeg downscaling, plus a comprehensive test harness (fake WS server/upstream, event collector) and new unit tests covering lifecycle, reconnect, screenshot behavior, and pending-command unblocking. Domain enablement/script injection is factored into `domains.go`, and `dispatchEvent` is stubbed pending the follow-up handlers PR. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit bfb6cd7. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent d73793c commit 348243a

File tree

9 files changed

+2292
-83
lines changed

9 files changed

+2292
-83
lines changed

server/cmd/api/api/api.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"log/slog"
78
"os"
89
"os/exec"
910
"sync"
@@ -103,7 +104,7 @@ func New(
103104
return nil, fmt.Errorf("captureSession cannot be nil")
104105
}
105106

106-
mon := cdpmonitor.New(upstreamMgr, captureSession.Publish, displayNum)
107+
mon := cdpmonitor.New(upstreamMgr, captureSession.Publish, displayNum, slog.Default())
107108
ctx, cancel := context.WithCancel(context.Background())
108109

109110
return &ApiService{

server/lib/cdpmonitor/cdp_test.go

Lines changed: 371 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,371 @@
1+
package cdpmonitor
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"image"
8+
"image/color"
9+
"image/png"
10+
"io"
11+
"log/slog"
12+
"net/http"
13+
"net/http/httptest"
14+
"strings"
15+
"sync"
16+
"testing"
17+
"time"
18+
19+
"github.com/coder/websocket"
20+
"github.com/coder/websocket/wsjson"
21+
"github.com/kernel/kernel-images/server/lib/events"
22+
"github.com/stretchr/testify/require"
23+
)
24+
25+
var discardLogger = slog.New(slog.NewTextHandler(io.Discard, nil))
26+
27+
// minimalPNG is a valid 1x1 PNG used as a test fixture for screenshot tests.
28+
var minimalPNG = func() []byte {
29+
img := image.NewRGBA(image.Rect(0, 0, 1, 1))
30+
img.Set(0, 0, color.RGBA{R: 255, G: 0, B: 0, A: 255})
31+
var buf bytes.Buffer
32+
_ = png.Encode(&buf, img)
33+
return buf.Bytes()
34+
}()
35+
36+
// testServer is a minimal WebSocket server that accepts connections and
37+
// lets the test drive scripted message sequences.
38+
type testServer struct {
39+
srv *httptest.Server
40+
conn *websocket.Conn
41+
connMu sync.Mutex
42+
connCh chan struct{} // closed when the first connection is accepted
43+
msgCh chan []byte // inbound messages from Monitor
44+
}
45+
46+
func newTestServer(t *testing.T) *testServer {
47+
t.Helper()
48+
s := &testServer{
49+
msgCh: make(chan []byte, 128),
50+
connCh: make(chan struct{}),
51+
}
52+
var connOnce sync.Once
53+
s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
54+
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{InsecureSkipVerify: true})
55+
if err != nil {
56+
return
57+
}
58+
s.connMu.Lock()
59+
s.conn = c
60+
s.connMu.Unlock()
61+
connOnce.Do(func() { close(s.connCh) })
62+
go func() {
63+
for {
64+
_, b, err := c.Read(context.Background())
65+
if err != nil {
66+
return
67+
}
68+
s.msgCh <- b
69+
}
70+
}()
71+
}))
72+
return s
73+
}
74+
75+
func (s *testServer) wsURL() string {
76+
return "ws" + strings.TrimPrefix(s.srv.URL, "http")
77+
}
78+
79+
func (s *testServer) sendToMonitor(t *testing.T, msg any) {
80+
t.Helper()
81+
s.connMu.Lock()
82+
c := s.conn
83+
s.connMu.Unlock()
84+
require.NotNil(t, c, "no active connection")
85+
require.NoError(t, wsjson.Write(context.Background(), c, msg))
86+
}
87+
88+
func (s *testServer) readFromMonitor(t *testing.T, timeout time.Duration) cdpMessage {
89+
t.Helper()
90+
select {
91+
case b := <-s.msgCh:
92+
var msg cdpMessage
93+
require.NoError(t, json.Unmarshal(b, &msg))
94+
return msg
95+
case <-time.After(timeout):
96+
t.Fatal("timeout waiting for message from Monitor")
97+
return cdpMessage{}
98+
}
99+
}
100+
101+
func (s *testServer) close() {
102+
s.connMu.Lock()
103+
if s.conn != nil {
104+
_ = s.conn.Close(websocket.StatusNormalClosure, "done")
105+
}
106+
s.connMu.Unlock()
107+
s.srv.Close()
108+
}
109+
110+
// testUpstream implements UpstreamProvider for tests.
111+
type testUpstream struct {
112+
mu sync.Mutex
113+
current string
114+
subs []chan string
115+
}
116+
117+
func newTestUpstream(url string) *testUpstream {
118+
return &testUpstream{current: url}
119+
}
120+
121+
func (u *testUpstream) Current() string {
122+
u.mu.Lock()
123+
defer u.mu.Unlock()
124+
return u.current
125+
}
126+
127+
func (u *testUpstream) Subscribe() (<-chan string, func()) {
128+
ch := make(chan string, 1)
129+
u.mu.Lock()
130+
u.subs = append(u.subs, ch)
131+
u.mu.Unlock()
132+
cancel := func() {
133+
u.mu.Lock()
134+
for i, s := range u.subs {
135+
if s == ch {
136+
u.subs = append(u.subs[:i], u.subs[i+1:]...)
137+
break
138+
}
139+
}
140+
u.mu.Unlock()
141+
close(ch)
142+
}
143+
return ch, cancel
144+
}
145+
146+
func (u *testUpstream) notifyRestart(newURL string) {
147+
u.mu.Lock()
148+
u.current = newURL
149+
subs := make([]chan string, len(u.subs))
150+
copy(subs, u.subs)
151+
u.mu.Unlock()
152+
for _, ch := range subs {
153+
select {
154+
case ch <- newURL:
155+
default:
156+
}
157+
}
158+
}
159+
160+
// eventCollector captures published events with channel-based notification.
161+
type eventCollector struct {
162+
mu sync.Mutex
163+
events []events.Event
164+
notify chan struct{} // signaled on every publish
165+
}
166+
167+
func newEventCollector() *eventCollector {
168+
return &eventCollector{notify: make(chan struct{}, 256)}
169+
}
170+
171+
func (c *eventCollector) publishFn() PublishFunc {
172+
return func(ev events.Event) {
173+
c.mu.Lock()
174+
c.events = append(c.events, ev)
175+
c.mu.Unlock()
176+
select {
177+
case c.notify <- struct{}{}:
178+
default:
179+
}
180+
}
181+
}
182+
183+
// waitFor blocks until an event of the given type is published, or fails.
184+
func (c *eventCollector) waitFor(t *testing.T, eventType string, timeout time.Duration) events.Event {
185+
t.Helper()
186+
deadline := time.After(timeout)
187+
for {
188+
c.mu.Lock()
189+
for _, ev := range c.events {
190+
if ev.Type == eventType {
191+
c.mu.Unlock()
192+
return ev
193+
}
194+
}
195+
c.mu.Unlock()
196+
select {
197+
case <-c.notify:
198+
case <-deadline:
199+
t.Fatalf("timeout waiting for event type=%q", eventType)
200+
return events.Event{}
201+
}
202+
}
203+
}
204+
205+
// waitForNew blocks until a NEW event of the given type is published after this
206+
// call, ignoring any events already in the collector.
207+
func (c *eventCollector) waitForNew(t *testing.T, eventType string, timeout time.Duration) events.Event {
208+
t.Helper()
209+
c.mu.Lock()
210+
skip := len(c.events)
211+
c.mu.Unlock()
212+
213+
deadline := time.After(timeout)
214+
for {
215+
c.mu.Lock()
216+
for i := skip; i < len(c.events); i++ {
217+
if c.events[i].Type == eventType {
218+
ev := c.events[i]
219+
c.mu.Unlock()
220+
return ev
221+
}
222+
}
223+
c.mu.Unlock()
224+
select {
225+
case <-c.notify:
226+
case <-deadline:
227+
t.Fatalf("timeout waiting for new event type=%q", eventType)
228+
return events.Event{}
229+
}
230+
}
231+
}
232+
233+
// assertNone verifies that no event of the given type arrives within d.
234+
func (c *eventCollector) assertNone(t *testing.T, eventType string, d time.Duration) {
235+
t.Helper()
236+
deadline := time.After(d)
237+
for {
238+
select {
239+
case <-c.notify:
240+
c.mu.Lock()
241+
for _, ev := range c.events {
242+
if ev.Type == eventType {
243+
c.mu.Unlock()
244+
t.Fatalf("unexpected event %q published", eventType)
245+
return
246+
}
247+
}
248+
c.mu.Unlock()
249+
case <-deadline:
250+
return
251+
}
252+
}
253+
}
254+
255+
// ResponderFunc is called for each CDP command the Monitor sends.
256+
// Return nil to use the default empty result.
257+
type ResponderFunc func(msg cdpMessage) any
258+
259+
// listenAndRespond drains srv.msgCh, calls fn for each command, and sends the
260+
// response. If fn is nil or returns nil, sends {"id": msg.ID, "result": {}}.
261+
func listenAndRespond(srv *testServer, stopCh <-chan struct{}, fn ResponderFunc) {
262+
for {
263+
select {
264+
case b := <-srv.msgCh:
265+
var msg cdpMessage
266+
if json.Unmarshal(b, &msg) != nil || msg.ID == nil {
267+
continue
268+
}
269+
srv.connMu.Lock()
270+
c := srv.conn
271+
srv.connMu.Unlock()
272+
if c == nil {
273+
continue
274+
}
275+
var resp any
276+
if fn != nil {
277+
resp = fn(msg)
278+
}
279+
if resp == nil {
280+
resp = map[string]any{"id": msg.ID, "result": map[string]any{}}
281+
}
282+
_ = wsjson.Write(context.Background(), c, resp)
283+
case <-stopCh:
284+
return
285+
}
286+
}
287+
}
288+
289+
// startMonitor creates a Monitor against srv, starts it, and returns a cleanup func.
290+
// Waits for Target.getTargets (the last command in initSession) before returning.
291+
func startMonitor(t *testing.T, srv *testServer, fn ResponderFunc) (*Monitor, *eventCollector, func()) {
292+
t.Helper()
293+
ec := newEventCollector()
294+
upstream := newTestUpstream(srv.wsURL())
295+
m := New(upstream, ec.publishFn(), 99, discardLogger)
296+
require.NoError(t, m.Start(context.Background()))
297+
298+
// Closed when Target.getTargets is responded to (last command of initSession).
299+
// Tests needing attachExistingTargets to finish should use require.Eventually.
300+
initDone := make(chan struct{})
301+
var initOnce sync.Once
302+
303+
wrappedFn := func(msg cdpMessage) any {
304+
var result any
305+
if fn != nil {
306+
result = fn(msg)
307+
}
308+
if msg.Method == "Target.getTargets" {
309+
initOnce.Do(func() { close(initDone) })
310+
}
311+
return result
312+
}
313+
314+
stopResponder := make(chan struct{})
315+
go listenAndRespond(srv, stopResponder, wrappedFn)
316+
317+
// Wait for the websocket connection to be established.
318+
select {
319+
case <-srv.connCh:
320+
case <-time.After(3 * time.Second):
321+
t.Fatal("fake server never received a connection")
322+
}
323+
// Wait for the init sequence to complete.
324+
select {
325+
case <-initDone:
326+
case <-time.After(5 * time.Second):
327+
t.Fatal("init sequence (Target.getTargets) did not complete")
328+
}
329+
330+
cleanup := func() {
331+
close(stopResponder)
332+
m.Stop()
333+
}
334+
return m, ec, cleanup
335+
}
336+
337+
// newComputedMonitor creates an unconnected Monitor for testing computed state
338+
// (network_idle, layout_settled, navigation_settled) without a real websocket.
339+
func newComputedMonitor(t *testing.T) (*Monitor, *eventCollector) {
340+
t.Helper()
341+
ec := newEventCollector()
342+
upstream := newTestUpstream("ws://127.0.0.1:0")
343+
m := New(upstream, ec.publishFn(), 0, discardLogger)
344+
return m, ec
345+
}
346+
347+
// navigateMonitor sends a Page.frameNavigated to reset computed state.
348+
func navigateMonitor(m *Monitor, url string) {
349+
p, _ := json.Marshal(map[string]any{
350+
"frame": map[string]any{"id": "f1", "url": url},
351+
})
352+
m.handleFrameNavigated(p, "s1")
353+
}
354+
355+
// simulateRequest sends a Network.requestWillBeSent through the handler.
356+
func simulateRequest(m *Monitor, id string) {
357+
p, _ := json.Marshal(map[string]any{
358+
"requestId": id, "resourceType": "Document",
359+
"request": map[string]any{"method": "GET", "url": "https://example.com/" + id},
360+
})
361+
m.handleNetworkRequest(p, "s1")
362+
}
363+
364+
// simulateFinished stores minimal state and sends Network.loadingFinished.
365+
func simulateFinished(m *Monitor, id string) {
366+
m.pendReqMu.Lock()
367+
m.pendingRequests[id] = networkReqState{method: "GET", url: "https://example.com/" + id}
368+
m.pendReqMu.Unlock()
369+
p, _ := json.Marshal(map[string]any{"requestId": id})
370+
m.handleLoadingFinished(p, "s1")
371+
}

0 commit comments

Comments
 (0)