Skip to content

v0.2.0: Add SSE resume, run export/compare, and crash recovery#1

Merged
ajit-zer07 merged 1 commit intomainfrom
sse-resume-recovery-change
Mar 20, 2026
Merged

v0.2.0: Add SSE resume, run export/compare, and crash recovery#1
ajit-zer07 merged 1 commit intomainfrom
sse-resume-recovery-change

Conversation

@ajit-zer07
Copy link
Contributor

PR Description

Summary

  • Resumable SSE streams: The /runs/:id/stream endpoint now supports
    afterSeq / Last-Event-ID for gap-free reconnection. Missed events are
    backfilled from the database in batches, deduplicated against the live hub,
    and each SSE frame carries an id field for browser-native resume. A
    configurable heartbeat (STREAM_SSE_HEARTBEAT_MS, default 15s) keeps
    connections alive through proxies.

  • Run insights: New GET /runs/:id/export bundles the full run state
    (run record, session, projection, metrics, artifacts, canonical + raw events)
    into a single JSON payload. New POST /runs/compare diffs two runs on status,
    duration, confidence, participants, and signals.

  • Crash recovery: RunRecoveryService runs on application bootstrap,
    queries for active runs (running / binding_session), promotes stalled
    binding_session runs, emits a session.stream.opened recovery event,
    and re-attaches stream consumers from lastEventSeq. Controlled via
    RUN_RECOVERY_ENABLED (default true).

  • Progress projection: Adds a progress JSONB column to run_projections
    (migration 0003), wires it through the projection repository and service,
    and bumps PROJECTION_SCHEMA_VERSION to 3. Removes the (row as any) cast.

  • Stream consumer reliability: Resets the retry counter after each
    successful event, fixes a timer leak in withIdleTimeout (clears timeout +
    calls iterator.return()), and unrefs timers to avoid blocking graceful
    shutdown.

New files

File Purpose
drizzle/0003_v2_progress_and_export.sql Migration: progress column + raw event indexes
src/controllers/run-insights.controller.ts Export & compare HTTP endpoints
src/insights/run-insights.service.ts Export bundle assembly, run comparison logic
src/runs/run-recovery.service.ts Startup recovery of active runs
src/dto/compare-runs.dto.ts Validation DTO for compare request
src/dto/export-run-query.dto.ts Validation DTO for export query params
src/dto/stream-run-query.dto.ts Validation DTO for SSE stream query params

Modified files

File Change
src/controllers/runs.controller.ts Rewritten streamRun with resume/backfill/heartbeat
src/runs/stream-consumer.service.ts Retry reset, timer leak fix, unref
src/storage/event.repository.ts listCanonicalRange, listRawByRun
src/storage/projection.repository.ts Persist progress on upsert
src/projection/projection.service.ts Typed progress read, schema v3
src/db/schema.ts progress column on runProjections
src/contracts/control-plane.ts RunExportBundle, RunComparisonResult
src/dto/run-responses.dto.ts RunBundleExportDto, RunComparisonResultDto
src/config/app-config.service.ts streamSseHeartbeatMs, runRecoveryEnabled
src/app.module.ts Register new controller + services
.env.example New env vars documented
package.json Version bump to 0.2.0

Test plan

  • Run npm test — all 17 suites pass (new specs for recovery, insights, event repo, controllers)
  • Verify SSE resume: connect to /runs/:id/stream?afterSeq=5, confirm backfilled events arrive before live ones, no
    duplicates
  • Verify heartbeat frames appear at the configured interval
  • Test GET /runs/:id/export returns a complete bundle with includeRaw=true
  • Test POST /runs/compare with two completed runs, verify participant/signal diffs
  • Restart the control plane with active runs in the DB, confirm RunRecoveryService reconnects streams
  • Run drizzle:migrate to apply 0003, verify progress column exists
  • Confirm graceful shutdown completes without hanging timers

  - Rewrite /runs/:id/stream to support resumable SSE via afterSeq query
    param and Last-Event-ID header, with backfill from DB, dedup, and
    configurable heartbeat
  - Add run export endpoint (GET /runs/:id/export) for full run bundles
    including session, projection, events, artifacts, and metrics
  - Add run comparison endpoint (POST /runs/compare) for side-by-side
    diffing of status, duration, confidence, participants, and signals
  - Add RunRecoveryService to reconnect active runs on startup, with
    binding_session promotion and recovery event emission
  - Add progress column to run_projections, bump projection schema to v3
  - Add listCanonicalRange and listRawByRun to EventRepository
  - Fix stream consumer: reset retry counter on success, fix timer leak
    in withIdleTimeout, unref timers to avoid blocking shutdown
  - Add indexes on run_events_raw for (run_id, ts) and (run_id, seq)
@ajit-zer07 ajit-zer07 merged commit d769d9b into main Mar 20, 2026
5 checks passed
@ajit-zer07 ajit-zer07 deleted the sse-resume-recovery-change branch March 20, 2026 20:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant