Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8a59058
feat: add cdpmonitor stub with start/stop lifecycle
archandatta Apr 1, 2026
e94fd9f
feat: add CDP protocol message types and internal state structs
archandatta Apr 1, 2026
96dede6
feat: implement CDP monitor with websocket capture, event handlers, a…
archandatta Apr 1, 2026
005d778
test: add CDP monitor test suite with in-process websocket mock
archandatta Apr 1, 2026
12fe9a0
review: create util.go for helper funcs
archandatta Apr 2, 2026
3cdc4b7
review
archandatta Apr 2, 2026
a015c77
review: update test
archandatta Apr 2, 2026
64673a5
review: clean up functions and tests
archandatta Apr 6, 2026
3cebd20
review: fix naming
archandatta Apr 6, 2026
aacf7c2
review: split up tests
archandatta Apr 6, 2026
c5012b5
review: cursor feedback
archandatta Apr 6, 2026
6058b42
review: reduce network logs
archandatta Apr 6, 2026
0e65e19
review: clearState() now calls failPendingCommands()
archandatta Apr 6, 2026
a9eb638
feat: add Pipeline glue type sequencing truncation, file write, and r…
archandatta Mar 20, 2026
dd31b84
review: fix truncateIfNeeded branch split, atomic.Pointer[string], Re…
archandatta Mar 27, 2026
b615377
fix: serialise Pipeline.Publish to guarantee monotonic seq delivery o…
archandatta Mar 27, 2026
fbb97a5
review
archandatta Mar 30, 2026
ab29008
refactor: rename BrowserEvent to Event, DetailDefault to DetailStandard
archandatta Mar 31, 2026
f0aed53
refactor: extract Envelope wrapper, move seq and capture_session_id o…
archandatta Mar 31, 2026
e07c024
refactor: unify seq as universal cursor, add NewReader(afterSeq)
archandatta Mar 31, 2026
3945ce4
refactor: return ReadResult instead of synthetic drop events
archandatta Mar 31, 2026
894e9d0
fix: guard against nil marshal data and oversized non-data envelopes
archandatta Apr 1, 2026
7d38b48
fix: remove duplicate ReadResult type in ringbuffer.go
archandatta Apr 6, 2026
6a67415
feat: add POST /events/publish and GET /events/stream endpoints
archandatta Apr 6, 2026
9425b05
review: update naming
archandatta Apr 6, 2026
c2435ed
review: update tests
archandatta Apr 6, 2026
27856a3
review: add mutex lock/unlock
archandatta Apr 6, 2026
839e1b5
feat: add tests
archandatta Apr 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions server/cmd/api/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type ApiService struct {
upstreamMgr *devtoolsproxy.UpstreamManager
stz scaletozero.Controller

// CDP event pipeline and cdpMonitor.
captureSession *events.CaptureSession
cdpMonitor *cdpmonitor.Monitor
monitorMu sync.Mutex

// inputMu serializes input-related operations (mouse, keyboard, screenshot)
inputMu sync.Mutex

Expand Down Expand Up @@ -70,11 +75,6 @@ type ApiService struct {
// xvfbResizeMu serializes background Xvfb restarts to prevent races
// when multiple CDP fast-path resizes fire in quick succession.
xvfbResizeMu sync.Mutex

// CDP event pipeline and cdpMonitor.
captureSession *events.CaptureSession
cdpMonitor *cdpmonitor.Monitor
monitorMu sync.Mutex
}

var _ oapi.StrictServerInterface = (*ApiService)(nil)
Expand All @@ -101,8 +101,6 @@ func New(
return nil, fmt.Errorf("captureSession cannot be nil")
}

mon := cdpmonitor.New(upstreamMgr, captureSession.Publish, displayNum)

return &ApiService{
recordManager: recordManager,
factory: factory,
Expand All @@ -114,7 +112,7 @@ func New(
nekoAuthClient: nekoAuthClient,
policy: &policy.Policy{},
captureSession: captureSession,
cdpMonitor: mon,
cdpMonitor: cdpmonitor.New(upstreamMgr, captureSession.Publish, displayNum),
}, nil
}

Expand Down Expand Up @@ -335,8 +333,14 @@ func (s *ApiService) ListRecorders(ctx context.Context, _ oapi.ListRecordersRequ

func (s *ApiService) Shutdown(ctx context.Context) error {
s.monitorMu.Lock()
s.cdpMonitor.Stop()
_ = s.captureSession.Close()
if s.cdpMonitor != nil {
s.cdpMonitor.Stop()
}
if s.captureSession != nil {
if err := s.captureSession.Close(); err != nil {
logger.FromContext(ctx).Error("failed to close capture session", "err", err)
}
}
s.monitorMu.Unlock()
return s.recordManager.StopAll(ctx)
}
91 changes: 85 additions & 6 deletions server/cmd/api/api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@ package api

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"

"github.com/google/uuid"
"github.com/onkernel/kernel-images/server/lib/events"
"github.com/onkernel/kernel-images/server/lib/logger"
)

// StartCapture handles POST /events/start.
// Generates a new capture session ID, seeds the pipeline, then starts the
// CDP monitor. If already running, the monitor is stopped and
// restarted with a fresh session ID
// StartCapture handles POST /events/start. Restarts if already running.
func (s *ApiService) StartCapture(w http.ResponseWriter, r *http.Request) {
s.monitorMu.Lock()
defer s.monitorMu.Unlock()

s.captureSession.Start(uuid.New().String())
captureSessionID := uuid.New().String()
s.captureSession.Start(captureSessionID)

if err := s.cdpMonitor.Start(context.Background()); err != nil {
logger.FromContext(r.Context()).Error("failed to start CDP monitor", "err", err)
Expand All @@ -26,10 +29,86 @@ func (s *ApiService) StartCapture(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

// StopCapture handles POST /events/stop
// StopCapture handles POST /events/stop. No-op if not running.
func (s *ApiService) StopCapture(w http.ResponseWriter, r *http.Request) {
s.monitorMu.Lock()
defer s.monitorMu.Unlock()
s.cdpMonitor.Stop()
w.WriteHeader(http.StatusOK)
}

// PublishEvent handles POST /events/publish.
// Defaults Category (via CategoryFor) and Source.Kind (to KindKernelAPI) when omitted.
func (s *ApiService) PublishEvent(w http.ResponseWriter, r *http.Request) {
var ev events.Event
if err := json.NewDecoder(r.Body).Decode(&ev); err != nil {
http.Error(w, "invalid JSON body", http.StatusBadRequest)
return
}

if ev.Type == "" {
http.Error(w, "type is required", http.StatusBadRequest)
return
}

if ev.Category == "" {
ev.Category = events.CategoryFor(ev.Type)
}

if ev.Source.Kind == "" {
ev.Source.Kind = events.KindKernelAPI
}

s.captureSession.Publish(ev)
w.WriteHeader(http.StatusOK)
}

// StreamEvents handles GET /events/stream (SSE).
// Supports Last-Event-ID for reconnection.
func (s *ApiService) StreamEvents(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}

var lastSeq uint64
if v := r.Header.Get("Last-Event-ID"); v != "" {
if n, err := strconv.ParseUint(v, 10, 64); err == nil {
lastSeq = n
}
}

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
flusher.Flush()

reader := s.captureSession.NewReader(lastSeq)
ctx := r.Context()

for {
res, err := reader.Read(ctx)
if err != nil {
return
}
if res.Envelope == nil {
continue
}
if err := writeSSEEnvelope(w, *res.Envelope); err != nil {
return
}
flusher.Flush()
}
}

// writeSSEEnvelope writes a single SSE frame: "id: {seq}\ndata: {json}\n\n".
func writeSSEEnvelope(w io.Writer, env events.Envelope) error {
data, err := json.Marshal(env)
if err != nil {
return err
}
_, err = fmt.Fprintf(w, "id: %d\ndata: %s\n\n", env.Seq, data)
return err
}
Loading