diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 0b0587b..3d21e65 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -257,6 +257,37 @@ Both modes fire the same `EventCallback` (`"spawned"`, `"status_changed"`, `"hea | `/api/mc/worker/register` | POST | Pre-register worker metadata before spawn | | `/api/mc/workers` | GET | List active workers from tracker | +## Swarm BFF (Backend for Frontend) + +The Swarm Dashboard provides a unified view across all OpenClaw services. The BFF layer (`orchestrator/api/swarm.go`) implements a fan-out pattern: + +```text +Browser → GET /api/swarm/overview → Orchestrator + ├─→ Warren /admin/health + /admin/agents + ├─→ Chronicle /api/v1/metrics/summary + /api/v1/dlq/stats + ├─→ Dispatch /api/v1/stats + /api/v1/agents + ├─→ PromptForge /api/prompts (array → count) + └─→ Alexandria /api/collections (array → count) +``` + +All 5 services are fetched concurrently with a 4s context deadline and 3s per-request timeout. Partial failures are isolated: if Chronicle is down, its error appears in the `errors` map while other services return normally. + +### Swarm Endpoints + +| Endpoint | Method | Purpose | +|----------|--------|---------| +| `/api/swarm/overview` | GET | Fan-out to all 5 services, return unified JSON | +| `/api/swarm/warren/health` | GET | Proxy Warren `/admin/health` | +| `/api/swarm/warren/events` | GET | Proxy Warren `/admin/events` SSE stream | + +### Frontend Architecture + +The Swarm tab uses a dedicated Zustand store (`useSwarmStore`) that: +- Polls `/api/swarm/overview` every 10s (only when the Live sub-tab is active) +- Connects to Warren SSE events via `EventSource` with exponential backoff +- Derives alerts from cross-service data (service-down detection, DLQ spike detection) +- Provides computed selectors (`useFleetSummary`, `usePipelineSummary`) for dashboard widgets + ## Stack | Component | Language | Purpose | diff --git a/CHANGELOG.md b/CHANGELOG.md index 4bf503d..f75e613 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,33 @@ All notable changes to MissionControl are documented in this file. +## v6.14 — Swarm Dashboard (2026-02-14) + +### Swarm BFF (Backend for Frontend) +- New `/api/swarm/overview` endpoint — fans out to Warren, Chronicle, Dispatch, PromptForge, and Alexandria in parallel, returns unified JSON with per-service error isolation +- `/api/swarm/warren/health` — proxies Warren health endpoint +- `/api/swarm/warren/events` — proxies Warren SSE event stream with buffered passthrough +- Shared HTTP client with 3s timeout; overview uses 4s context deadline +- Service URLs configurable via `WARREN_URL`, `CHRONICLE_URL`, `DISPATCH_URL`, `PROMPTFORGE_URL`, `ALEXANDRIA_URL` environment variables + +### Swarm Dashboard (Frontend) +- New "Swarm" tab in the main dashboard with Live and Schedule sub-tabs +- **FleetOverview** — agent counts, DLQ depth, prompt/collection counts +- **ServiceStatus** — per-service health indicators with error highlighting +- **TaskPipeline** — dispatch stats (pending, in-progress, completed, failed) +- **AgentGrid** — combined warren + dispatch agent roster +- **EventTimeline** — live SSE event stream from Warren +- Zustand store (`useSwarmStore`) with alert derivation (service-down, DLQ spike detection) +- `useSwarmPolling` hook — 10s interval polling, active only on Live tab +- `useWarrenSSE` hook — EventSource with exponential backoff reconnection + +### Testing +- `swarm_test.go` — 6 Go tests: full overview, partial failure, method guard, health proxy, health down, SSE passthrough +- `useSwarmStore.test.ts` — 14 Vitest tests: state actions, alert derivation, DLQ spike detection, fleet/pipeline computations +- `swarm.spec.ts` — 4 Playwright E2E tests: tab visibility, sub-tab navigation, schedule placeholder + +--- + ## v6.13 — Process Purity Phase 5 (2026-02-10) ### Mandatory Task Binding (`mc commit --task`) diff --git a/orchestrator/api/routes.go b/orchestrator/api/routes.go index e96dbbc..be8992e 100644 --- a/orchestrator/api/routes.go +++ b/orchestrator/api/routes.go @@ -93,6 +93,11 @@ func (s *Server) Routes() http.Handler { // Stages mux.HandleFunc("/api/stages/override", s.methodPOST(s.handleStageOverride)) + // Swarm BFF + mux.HandleFunc("/api/swarm/overview", s.methodGET(s.handleSwarmOverview)) + mux.HandleFunc("/api/swarm/warren/health", s.methodGET(s.handleSwarmWarrenHealth)) + mux.HandleFunc("/api/swarm/warren/events", s.methodGET(s.handleSwarmWarrenEvents)) + // Placeholders mux.HandleFunc("/api/openclaw/status", s.methodGET(s.handleOpenClawStatus)) mux.HandleFunc("/api/requirements", s.methodGET(s.handleRequirements)) diff --git a/orchestrator/api/swarm.go b/orchestrator/api/swarm.go new file mode 100644 index 0000000..06e333c --- /dev/null +++ b/orchestrator/api/swarm.go @@ -0,0 +1,274 @@ +package api + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "sync" + "time" +) + +// Service base URLs — configurable via environment variables. +var ( + warrenURL = envOrDefault("WARREN_URL", "http://localhost:9090") + chronicleURL = envOrDefault("CHRONICLE_URL", "http://localhost:8700") + dispatchURL = envOrDefault("DISPATCH_URL", "http://localhost:8600") + promptForgeURL = envOrDefault("PROMPTFORGE_URL", "http://localhost:8400") + alexandriaURL = envOrDefault("ALEXANDRIA_URL", "http://localhost:8500") +) + +func envOrDefault(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +// swarmClient is a shared HTTP client with reasonable timeouts for fan-out. +var swarmClient = &http.Client{ + Timeout: 3 * time.Second, +} + +// SwarmOverview is the aggregated response from all backend services. +type SwarmOverview struct { + Warren *json.RawMessage `json:"warren,omitempty"` + Chronicle *json.RawMessage `json:"chronicle,omitempty"` + Dispatch *json.RawMessage `json:"dispatch,omitempty"` + PromptForge *json.RawMessage `json:"promptforge,omitempty"` + Alexandria *json.RawMessage `json:"alexandria,omitempty"` + Errors map[string]string `json:"errors"` + FetchedAt string `json:"fetched_at"` +} + +// fetchJSON performs a GET request and returns the raw JSON body. +func fetchJSON(ctx context.Context, url string) (json.RawMessage, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + resp, err := swarmClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + return nil, fmt.Errorf("HTTP %d", resp.StatusCode) + } + + body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) // 1MB limit + if err != nil { + return nil, err + } + return json.RawMessage(body), nil +} + +// fetchService fetches multiple endpoints from a single service and merges them +// into a single JSON object with the given keys. +func fetchService(ctx context.Context, baseURL string, endpoints map[string]string) (json.RawMessage, error) { + result := map[string]json.RawMessage{} + var mu sync.Mutex + var wg sync.WaitGroup + var firstErr error + + for key, path := range endpoints { + wg.Add(1) + go func(k, p string) { + defer wg.Done() + data, err := fetchJSON(ctx, baseURL+p) + mu.Lock() + defer mu.Unlock() + if err != nil { + if firstErr == nil { + firstErr = err + } + return + } + result[k] = data + }(key, path) + } + wg.Wait() + + if len(result) == 0 && firstErr != nil { + return nil, firstErr + } + raw, err := json.Marshal(result) + if err != nil { + return nil, err + } + return json.RawMessage(raw), nil +} + +// handleSwarmOverview fans out to all services and returns a unified overview. +func (s *Server) handleSwarmOverview(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), 4*time.Second) + defer cancel() + + overview := SwarmOverview{ + Errors: map[string]string{}, + } + + type serviceResult struct { + name string + data json.RawMessage + err error + } + + ch := make(chan serviceResult, 5) + + // Warren: health + agents + go func() { + data, err := fetchService(ctx, warrenURL, map[string]string{ + "health": "/admin/health", + "agents": "/admin/agents", + }) + ch <- serviceResult{"warren", data, err} + }() + + // Chronicle: metrics + DLQ + go func() { + data, err := fetchService(ctx, chronicleURL, map[string]string{ + "metrics": "/api/v1/metrics/summary", + "dlq": "/api/v1/dlq/stats", + }) + ch <- serviceResult{"chronicle", data, err} + }() + + // Dispatch: stats + agents + go func() { + data, err := fetchService(ctx, dispatchURL, map[string]string{ + "stats": "/api/v1/stats", + "agents": "/api/v1/agents", + }) + ch <- serviceResult{"dispatch", data, err} + }() + + // PromptForge: prompt count + go func() { + raw, err := fetchJSON(ctx, promptForgeURL+"/api/prompts") + if err != nil { + ch <- serviceResult{"promptforge", nil, err} + return + } + // Count array length + var items []json.RawMessage + if jsonErr := json.Unmarshal(raw, &items); jsonErr != nil { + // Maybe it's an object with a count field — pass through as-is + result, _ := json.Marshal(map[string]interface{}{"prompts": raw}) + ch <- serviceResult{"promptforge", json.RawMessage(result), nil} + return + } + result, _ := json.Marshal(map[string]int{"prompt_count": len(items)}) + ch <- serviceResult{"promptforge", json.RawMessage(result), nil} + }() + + // Alexandria: collection count + go func() { + raw, err := fetchJSON(ctx, alexandriaURL+"/api/collections") + if err != nil { + ch <- serviceResult{"alexandria", nil, err} + return + } + var items []json.RawMessage + if jsonErr := json.Unmarshal(raw, &items); jsonErr != nil { + result, _ := json.Marshal(map[string]interface{}{"collections": raw}) + ch <- serviceResult{"alexandria", json.RawMessage(result), nil} + return + } + result, _ := json.Marshal(map[string]int{"collection_count": len(items)}) + ch <- serviceResult{"alexandria", json.RawMessage(result), nil} + }() + + // Collect results + for i := 0; i < 5; i++ { + res := <-ch + if res.err != nil { + overview.Errors[res.name] = res.err.Error() + continue + } + raw := res.data + switch res.name { + case "warren": + overview.Warren = &raw + case "chronicle": + overview.Chronicle = &raw + case "dispatch": + overview.Dispatch = &raw + case "promptforge": + overview.PromptForge = &raw + case "alexandria": + overview.Alexandria = &raw + } + } + + overview.FetchedAt = time.Now().UTC().Format(time.RFC3339) + writeJSON(w, http.StatusOK, overview) +} + +// handleSwarmWarrenHealth proxies Warren /admin/health. +func (s *Server) handleSwarmWarrenHealth(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second) + defer cancel() + + data, err := fetchJSON(ctx, warrenURL+"/admin/health") + if err != nil { + respondError(w, http.StatusBadGateway, fmt.Sprintf("warren: %s", err)) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(data) +} + +// handleSwarmWarrenEvents proxies Warren /admin/events as an SSE stream. +func (s *Server) handleSwarmWarrenEvents(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + respondError(w, http.StatusInternalServerError, "streaming not supported") + return + } + + ctx := r.Context() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, warrenURL+"/admin/events", nil) + if err != nil { + respondError(w, http.StatusInternalServerError, err.Error()) + return + } + req.Header.Set("Accept", "text/event-stream") + + // Use a client without the default timeout for long-lived SSE. + sseClient := &http.Client{Timeout: 0} + resp, err := sseClient.Do(req) + if err != nil { + respondError(w, http.StatusBadGateway, fmt.Sprintf("warren events: %s", err)) + return + } + defer resp.Body.Close() + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + buf := make([]byte, 4096) + for { + select { + case <-ctx.Done(): + return + default: + n, readErr := resp.Body.Read(buf) + if n > 0 { + w.Write(buf[:n]) + flusher.Flush() + } + if readErr != nil { + return + } + } + } +} diff --git a/orchestrator/api/swarm_test.go b/orchestrator/api/swarm_test.go new file mode 100644 index 0000000..c11ee28 --- /dev/null +++ b/orchestrator/api/swarm_test.go @@ -0,0 +1,330 @@ +package api + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +// mockService spins up a test HTTP server that serves canned JSON responses +// keyed by request path. +func mockService(t *testing.T, responses map[string]string) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, ok := responses[r.URL.Path] + if !ok { + http.NotFound(w, r) + return + } + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(body)) + })) +} + +func TestSwarmOverviewEndpoint(t *testing.T) { + // Spin up mock HTTP servers for all 5 services. + warren := mockService(t, map[string]string{ + "/admin/health": `{"status":"ok","uptime":12345}`, + "/admin/agents": `[{"id":"a1","name":"Agent1","state":"ready"}]`, + }) + defer warren.Close() + + chronicle := mockService(t, map[string]string{ + "/api/v1/metrics/summary": `{"total_events":100}`, + "/api/v1/dlq/stats": `{"depth":3}`, + }) + defer chronicle.Close() + + dispatch := mockService(t, map[string]string{ + "/api/v1/stats": `{"pending":2,"in_progress":5,"completed":10}`, + "/api/v1/agents": `[{"id":"d1","status":"active"}]`, + }) + defer dispatch.Close() + + promptforge := mockService(t, map[string]string{ + "/api/prompts": `[{"id":"p1"},{"id":"p2"},{"id":"p3"}]`, + }) + defer promptforge.Close() + + alexandria := mockService(t, map[string]string{ + "/api/collections": `[{"id":"c1"}]`, + }) + defer alexandria.Close() + + // Override package-level URL vars to point at our mock servers. + origWarren := warrenURL + origChronicle := chronicleURL + origDispatch := dispatchURL + origPromptForge := promptForgeURL + origAlexandria := alexandriaURL + warrenURL = warren.URL + chronicleURL = chronicle.URL + dispatchURL = dispatch.URL + promptForgeURL = promptforge.URL + alexandriaURL = alexandria.URL + defer func() { + warrenURL = origWarren + chronicleURL = origChronicle + dispatchURL = origDispatch + promptForgeURL = origPromptForge + alexandriaURL = origAlexandria + }() + + s, _ := newTestServer(t) + routes := s.Routes() + + req := httptest.NewRequest("GET", "/api/swarm/overview", nil) + w := httptest.NewRecorder() + routes.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("Expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var overview map[string]json.RawMessage + if err := json.Unmarshal(w.Body.Bytes(), &overview); err != nil { + t.Fatalf("Invalid JSON response: %v", err) + } + + // All 5 service keys should be present. + for _, key := range []string{"warren", "chronicle", "dispatch", "promptforge", "alexandria"} { + if _, ok := overview[key]; !ok { + t.Errorf("Expected key %q in response", key) + } + } + + // Must have fetched_at. + if _, ok := overview["fetched_at"]; !ok { + t.Error("Expected fetched_at in response") + } + + // Errors should be empty. + var errMap map[string]string + if err := json.Unmarshal(overview["errors"], &errMap); err != nil { + t.Fatalf("Failed to parse errors: %v", err) + } + if len(errMap) != 0 { + t.Errorf("Expected empty errors, got %v", errMap) + } + + // PromptForge should have been counted as array. + var pf map[string]int + if err := json.Unmarshal(overview["promptforge"], &pf); err != nil { + t.Fatalf("Failed to parse promptforge: %v", err) + } + if pf["prompt_count"] != 3 { + t.Errorf("Expected prompt_count=3, got %d", pf["prompt_count"]) + } + + // Alexandria should have been counted as array. + var ax map[string]int + if err := json.Unmarshal(overview["alexandria"], &ax); err != nil { + t.Fatalf("Failed to parse alexandria: %v", err) + } + if ax["collection_count"] != 1 { + t.Errorf("Expected collection_count=1, got %d", ax["collection_count"]) + } +} + +func TestSwarmOverviewPartialFailure(t *testing.T) { + // Only mock Warren and PromptForge; the rest point at dead URLs. + warren := mockService(t, map[string]string{ + "/admin/health": `{"status":"ok"}`, + "/admin/agents": `[]`, + }) + defer warren.Close() + + promptforge := mockService(t, map[string]string{ + "/api/prompts": `[]`, + }) + defer promptforge.Close() + + origWarren := warrenURL + origChronicle := chronicleURL + origDispatch := dispatchURL + origPromptForge := promptForgeURL + origAlexandria := alexandriaURL + warrenURL = warren.URL + chronicleURL = "http://127.0.0.1:1" // dead + dispatchURL = "http://127.0.0.1:1" + promptForgeURL = promptforge.URL + alexandriaURL = "http://127.0.0.1:1" + defer func() { + warrenURL = origWarren + chronicleURL = origChronicle + dispatchURL = origDispatch + promptForgeURL = origPromptForge + alexandriaURL = origAlexandria + }() + + s, _ := newTestServer(t) + routes := s.Routes() + + req := httptest.NewRequest("GET", "/api/swarm/overview", nil) + w := httptest.NewRecorder() + routes.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("Expected 200 even with partial failure, got %d", w.Code) + } + + var overview map[string]json.RawMessage + if err := json.Unmarshal(w.Body.Bytes(), &overview); err != nil { + t.Fatalf("Invalid JSON: %v", err) + } + + // Warren and PromptForge should have data. + if overview["warren"] == nil { + t.Error("Expected warren data") + } + if overview["promptforge"] == nil { + t.Error("Expected promptforge data") + } + + // Errors should contain the three dead services. + var errMap map[string]string + if err := json.Unmarshal(overview["errors"], &errMap); err != nil { + t.Fatalf("Failed to parse errors: %v", err) + } + for _, svc := range []string{"chronicle", "dispatch", "alexandria"} { + if _, ok := errMap[svc]; !ok { + t.Errorf("Expected error for %q", svc) + } + } + // Warren and promptforge should NOT be in errors. + for _, svc := range []string{"warren", "promptforge"} { + if _, ok := errMap[svc]; ok { + t.Errorf("Did not expect error for %q", svc) + } + } +} + +func TestSwarmOverviewMethodNotAllowed(t *testing.T) { + s, _ := newTestServer(t) + routes := s.Routes() + + req := httptest.NewRequest("POST", "/api/swarm/overview", nil) + w := httptest.NewRecorder() + routes.ServeHTTP(w, req) + + if w.Code != http.StatusMethodNotAllowed { + t.Errorf("Expected 405, got %d", w.Code) + } +} + +func TestSwarmWarrenHealthProxy(t *testing.T) { + warren := mockService(t, map[string]string{ + "/admin/health": `{"status":"ok","uptime":9999,"version":"1.2.3"}`, + }) + defer warren.Close() + + origWarren := warrenURL + warrenURL = warren.URL + defer func() { warrenURL = origWarren }() + + s, _ := newTestServer(t) + routes := s.Routes() + + req := httptest.NewRequest("GET", "/api/swarm/warren/health", nil) + w := httptest.NewRecorder() + routes.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("Expected 200, got %d: %s", w.Code, w.Body.String()) + } + + ct := w.Header().Get("Content-Type") + if ct != "application/json" { + t.Errorf("Expected Content-Type application/json, got %s", ct) + } + + // Verify the proxied JSON matches. + var health map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &health); err != nil { + t.Fatalf("Invalid JSON: %v", err) + } + if health["status"] != "ok" { + t.Errorf("Expected status ok, got %v", health["status"]) + } + if health["version"] != "1.2.3" { + t.Errorf("Expected version 1.2.3, got %v", health["version"]) + } +} + +func TestSwarmWarrenHealthDown(t *testing.T) { + origWarren := warrenURL + warrenURL = "http://127.0.0.1:1" // dead + defer func() { warrenURL = origWarren }() + + s, _ := newTestServer(t) + routes := s.Routes() + + req := httptest.NewRequest("GET", "/api/swarm/warren/health", nil) + w := httptest.NewRecorder() + routes.ServeHTTP(w, req) + + if w.Code != http.StatusBadGateway { + t.Errorf("Expected 502, got %d", w.Code) + } +} + +func TestSwarmWarrenEventsSSE(t *testing.T) { + // Create a mock SSE stream from Warren. + sseData := "data: {\"type\":\"heartbeat\"}\n\n" + warren := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/admin/events" { + http.NotFound(w, r) + return + } + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.WriteHeader(http.StatusOK) + flusher, ok := w.(http.Flusher) + if ok { + flusher.Flush() + } + fmt.Fprint(w, sseData) + if ok { + flusher.Flush() + } + // Close after sending one event so the proxy returns. + })) + defer warren.Close() + + origWarren := warrenURL + warrenURL = warren.URL + defer func() { warrenURL = origWarren }() + + s, _ := newTestServer(t) + routes := s.Routes() + + req := httptest.NewRequest("GET", "/api/swarm/warren/events", nil) + w := httptest.NewRecorder() + + // Run in goroutine with a timeout to avoid hanging. + done := make(chan struct{}) + go func() { + routes.ServeHTTP(w, req) + close(done) + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("SSE proxy timed out") + } + + ct := w.Header().Get("Content-Type") + if !strings.HasPrefix(ct, "text/event-stream") { + t.Errorf("Expected Content-Type text/event-stream, got %s", ct) + } + + body := w.Body.String() + if !strings.Contains(body, "heartbeat") { + t.Errorf("Expected SSE data passthrough, got: %s", body) + } +} diff --git a/scripts/e2e-orchestrator.sh b/scripts/e2e-orchestrator.sh index 137b60b..189700c 100755 --- a/scripts/e2e-orchestrator.sh +++ b/scripts/e2e-orchestrator.sh @@ -56,7 +56,7 @@ echo "--- Task CRUD ---" HTTP=$(curl -s -o /tmp/e2e_body -w '%{http_code}' -X POST \ "${AUTH[@]}" \ -H "Content-Type: application/json" \ - -d '{"name":"E2E smoke task","stage":"implement","zone":"core","persona":"engineer"}' \ + -d '{"title":"E2E smoke task","stage":"implement","zone":"core"}' \ "$BASE/tasks") if [ "$HTTP" = "201" ] || [ "$HTTP" = "200" ]; then pass "POST /api/tasks -> $HTTP" diff --git a/web/e2e/swarm.spec.ts b/web/e2e/swarm.spec.ts new file mode 100644 index 0000000..1cf0741 --- /dev/null +++ b/web/e2e/swarm.spec.ts @@ -0,0 +1,58 @@ +import { test, expect } from '@playwright/test' + +test.describe('Swarm Dashboard', () => { + test.beforeEach(async ({ page }) => { + await page.goto('/') + }) + + test('should show swarm tab', async ({ page }) => { + await page.waitForLoadState('networkidle') + + // The Swarm tab button should be visible on page load. + const swarmTab = page.locator('button:has-text("Swarm")') + await expect(swarmTab).toBeVisible({ timeout: 5000 }) + }) + + test('should switch to swarm view', async ({ page }) => { + await page.waitForLoadState('networkidle') + + // Click the Swarm tab. + await page.locator('button:has-text("Swarm")').click() + + // Live and Schedule sub-tabs should appear. + const liveTab = page.locator('button:has-text("Live")') + const scheduleTab = page.locator('button:has-text("Schedule")') + + await expect(liveTab).toBeVisible({ timeout: 5000 }) + await expect(scheduleTab).toBeVisible({ timeout: 5000 }) + }) + + test('should show schedule placeholder', async ({ page }) => { + await page.waitForLoadState('networkidle') + + // Navigate to Swarm > Schedule. + await page.locator('button:has-text("Swarm")').click() + await page.locator('button:has-text("Schedule")').click() + + // The placeholder text should be visible. + const placeholder = page.locator('text=/Phase 2/i') + await expect(placeholder).toBeVisible({ timeout: 5000 }) + }) + + test('should switch back to live', async ({ page }) => { + await page.waitForLoadState('networkidle') + + // Navigate to Swarm > Schedule, then back to Live. + await page.locator('button:has-text("Swarm")').click() + await page.locator('button:has-text("Schedule")').click() + await page.locator('button:has-text("Live")').click() + + // Schedule placeholder should disappear. + const placeholder = page.locator('text=/Phase 2/i') + await expect(placeholder).not.toBeVisible({ timeout: 5000 }) + + // Live or Schedule button with active style should indicate Live is selected. + const liveTab = page.locator('button:has-text("Live")') + await expect(liveTab).toBeVisible() + }) +}) diff --git a/web/src/App.tsx b/web/src/App.tsx index caecf9e..9658df3 100644 --- a/web/src/App.tsx +++ b/web/src/App.tsx @@ -23,9 +23,10 @@ import { StageView } from './domains/workflow/StageView' import { TokenUsage } from './domains/knowledge/TokenUsage' import { GateApproval } from './domains/strategy/GateApproval' import { FindingsViewer } from './components/FindingsViewer' +import { SwarmDashboard } from './domains/swarm/SwarmDashboard' import type { Zone, Agent } from './types' -type ViewMode = 'agents' | 'workflow' | 'tokens' | 'gates' | 'findings' +type ViewMode = 'agents' | 'workflow' | 'tokens' | 'gates' | 'findings' | 'swarm' function App() { // View mode for v4 panels @@ -229,14 +230,14 @@ function App() {
{/* View mode tabs */}
- {(['agents', 'workflow', 'tokens', 'gates', 'findings'] as ViewMode[]).map((mode) => ( + {(['agents', 'workflow', 'tokens', 'gates', 'findings', 'swarm'] as ViewMode[]).map((mode) => ( ))}
@@ -273,6 +275,8 @@ function App() {
+ ) : viewMode === 'swarm' ? ( + ) : null}
diff --git a/web/src/domains/swarm/AgentGrid.tsx b/web/src/domains/swarm/AgentGrid.tsx new file mode 100644 index 0000000..8342fd9 --- /dev/null +++ b/web/src/domains/swarm/AgentGrid.tsx @@ -0,0 +1,70 @@ +import { useSwarmOverview } from '../../stores/useSwarmStore' +import type { WarrenAgent } from '../../types/swarm' + +export function AgentGrid() { + const overview = useSwarmOverview() + const agents = overview?.warren?.agents ?? [] + + return ( +
+

+ Agent Fleet + {agents.length > 0 && ( + ({agents.length}) + )} +

+ + {agents.length === 0 ? ( +
+ No agents connected +
+ ) : ( +
+ {agents.map((agent) => ( + + ))} +
+ )} +
+ ) +} + +function AgentCard({ agent }: { agent: WarrenAgent }) { + const stateColor = getStateColor(agent.state) + + return ( +
+ {/* State indicator */} + + + {/* Agent info */} +
+
+ {agent.name || agent.id} +
+
+ {agent.state} + {agent.policy && {agent.policy}} + {agent.connections !== undefined && ( + {agent.connections} conn + )} +
+
+
+ ) +} + +function getStateColor(state: string): string { + switch (state) { + case 'ready': + return 'bg-green-500' + case 'sleeping': + return 'bg-yellow-500' + case 'starting': + return 'bg-blue-500 animate-pulse' + case 'stopping': + return 'bg-orange-500' + default: + return 'bg-gray-500' + } +} diff --git a/web/src/domains/swarm/EventTimeline.tsx b/web/src/domains/swarm/EventTimeline.tsx new file mode 100644 index 0000000..bfa8bb5 --- /dev/null +++ b/web/src/domains/swarm/EventTimeline.tsx @@ -0,0 +1,89 @@ +import { useSwarmEvents } from '../../stores/useSwarmStore' +import type { WarrenSSEEvent } from '../../types/swarm' + +export function EventTimeline() { + const events = useSwarmEvents() + + return ( +
+

+ Live Events + {events.length > 0 && ( + ({events.length}) + )} +

+ + {events.length === 0 ? ( +
+ Waiting for events from Warren... +
+ ) : ( +
+ {events.map((event, i) => ( + + ))} +
+ )} +
+ ) +} + +function EventRow({ event }: { event: WarrenSSEEvent }) { + const time = new Date(event.timestamp).toLocaleTimeString() + + return ( +
+ {/* Timestamp */} + + {time} + + + {/* Event type badge */} + + {event.type} + + + {/* Agent name */} + {event.agent && ( + + {event.agent} + + )} + + {/* Event summary */} + + {formatEventData(event)} + +
+ ) +} + +function getEventBadgeStyle(type: string): string { + if (type.includes('error') || type.includes('fail')) { + return 'bg-red-500/20 text-red-400' + } + if (type.includes('warn') || type.includes('alert')) { + return 'bg-yellow-500/20 text-yellow-400' + } + if (type.includes('spawn') || type.includes('start') || type.includes('connect')) { + return 'bg-green-500/20 text-green-400' + } + if (type.includes('stop') || type.includes('kill') || type.includes('disconnect')) { + return 'bg-orange-500/20 text-orange-400' + } + return 'bg-gray-700 text-gray-400' +} + +function formatEventData(event: WarrenSSEEvent): string { + if (!event.data) return '' + if (typeof event.data === 'string') return event.data + try { + const str = JSON.stringify(event.data) + return str.length > 80 ? str.slice(0, 80) + '...' : str + } catch { + return '' + } +} diff --git a/web/src/domains/swarm/FleetOverview.tsx b/web/src/domains/swarm/FleetOverview.tsx new file mode 100644 index 0000000..6e863cb --- /dev/null +++ b/web/src/domains/swarm/FleetOverview.tsx @@ -0,0 +1,56 @@ +import { useFleetSummary, useSwarmStore } from '../../stores/useSwarmStore' + +export function FleetOverview() { + const summary = useFleetSummary() + const errorCount = useSwarmStore((s) => Object.keys(s.overview?.errors ?? {}).length) + + if (!summary) return null + + const cards = [ + { + label: 'Agents', + value: summary.totalAgents, + sub: `${summary.readyCount} ready / ${summary.sleepingCount} sleeping`, + color: summary.totalAgents > 0 ? 'text-green-400' : 'text-gray-500' + }, + { + label: 'Active Tasks', + value: summary.activeTasks, + sub: `${summary.dlqDepth} in DLQ`, + color: summary.activeTasks > 0 ? 'text-blue-400' : 'text-gray-500' + }, + { + label: 'Prompts', + value: summary.promptCount, + sub: `${summary.collectionCount} collections`, + color: 'text-purple-400' + }, + { + label: 'Services', + value: `${5 - errorCount}/5`, + sub: errorCount > 0 ? `${errorCount} degraded` : 'All healthy', + color: errorCount === 0 ? 'text-green-400' : errorCount >= 3 ? 'text-red-400' : 'text-yellow-400' + } + ] + + return ( +
+ {cards.map((card) => ( +
+
+ {card.label} +
+
+ {card.value} +
+
+ {card.sub} +
+
+ ))} +
+ ) +} diff --git a/web/src/domains/swarm/ServiceStatus.tsx b/web/src/domains/swarm/ServiceStatus.tsx new file mode 100644 index 0000000..35a5c3f --- /dev/null +++ b/web/src/domains/swarm/ServiceStatus.tsx @@ -0,0 +1,92 @@ +import { useSwarmOverview } from '../../stores/useSwarmStore' + +interface ServiceInfo { + name: string + key: string + metric?: string +} + +const SERVICES: ServiceInfo[] = [ + { name: 'Warren', key: 'warren', metric: 'agents connected' }, + { name: 'Chronicle', key: 'chronicle', metric: 'events tracked' }, + { name: 'Dispatch', key: 'dispatch', metric: 'tasks managed' }, + { name: 'PromptForge', key: 'promptforge', metric: 'prompts' }, + { name: 'Alexandria', key: 'alexandria', metric: 'collections' } +] + +export function ServiceStatus() { + const overview = useSwarmOverview() + + if (!overview) return null + + return ( +
+

+ Service Health +

+
+ {SERVICES.map((svc) => { + const isDown = !!overview.errors[svc.key] + const errorMsg = overview.errors[svc.key] + const keyMetric = getKeyMetric(overview, svc.key) + + return ( +
+ {/* Status dot */} + + + {/* Name */} + + {svc.name} + + + {/* Metric or error */} + + {isDown ? truncate(errorMsg, 30) : keyMetric} + +
+ ) + })} +
+
+ ) +} + +function getKeyMetric(overview: NonNullable>, key: string): string { + switch (key) { + case 'warren': + return overview.warren?.agents + ? `${overview.warren.agents.length} agents` + : 'connected' + case 'chronicle': { + const depth = overview.chronicle?.dlq?.depth + return depth !== undefined ? `DLQ: ${depth}` : 'ok' + } + case 'dispatch': { + const active = overview.dispatch?.stats?.in_progress + return active !== undefined ? `${active} active` : 'ok' + } + case 'promptforge': + return overview.promptforge?.prompt_count !== undefined + ? `${overview.promptforge.prompt_count} prompts` + : 'ok' + case 'alexandria': + return overview.alexandria?.collection_count !== undefined + ? `${overview.alexandria.collection_count} collections` + : 'ok' + default: + return 'ok' + } +} + +function truncate(str: string | undefined, max: number): string { + if (!str) return 'down' + return str.length > max ? str.slice(0, max) + '...' : str +} diff --git a/web/src/domains/swarm/SwarmDashboard.tsx b/web/src/domains/swarm/SwarmDashboard.tsx new file mode 100644 index 0000000..ec41ee3 --- /dev/null +++ b/web/src/domains/swarm/SwarmDashboard.tsx @@ -0,0 +1,115 @@ +import { useState } from 'react' +import { useSwarmPolling } from '../../hooks/useSwarmPolling' +import { useWarrenSSE } from '../../hooks/useWarrenSSE' +import { useSwarmLoading, useSwarmStore } from '../../stores/useSwarmStore' +import { FleetOverview } from './FleetOverview' +import { ServiceStatus } from './ServiceStatus' +import { TaskPipeline } from './TaskPipeline' +import { AgentGrid } from './AgentGrid' +import { EventTimeline } from './EventTimeline' + +type SubTab = 'live' | 'schedule' + +export function SwarmDashboard() { + const [subTab, setSubTab] = useState('live') + const loading = useSwarmLoading() + const lastFetched = useSwarmStore((s) => s.lastFetched) + + // Poll when on the live tab + useSwarmPolling({ enabled: subTab === 'live' }) + + // SSE always active when mounted + useWarrenSSE() + + return ( +
+ {/* Sub-tab bar */} +
+ {(['live', 'schedule'] as SubTab[]).map((tab) => ( + + ))} + {lastFetched && ( + + Updated {new Date(lastFetched).toLocaleTimeString()} + + )} +
+ + {/* Tab content */} +
+ {subTab === 'live' ? ( + loading && !lastFetched ? ( + + ) : ( + + ) + ) : ( + + )} +
+
+ ) +} + +function LiveView() { + return ( +
+ {/* Row 1: Fleet overview (full width) */} + + + {/* Row 2: 3-column grid */} +
+ + + +
+ + {/* Row 3: Event timeline (full width) */} + +
+ ) +} + +function SchedulePlaceholder() { + return ( +
+
+
Schedule View
+
Coming in Phase 2
+
+
+ ) +} + +function LoadingSkeleton() { + return ( +
+ {/* Fleet overview skeleton */} +
+ {Array.from({ length: 4 }).map((_, i) => ( +
+ ))} +
+ {/* 3 column skeleton */} +
+ {Array.from({ length: 3 }).map((_, i) => ( +
+ ))} +
+ {/* Timeline skeleton */} +
+
+ ) +} diff --git a/web/src/domains/swarm/TaskPipeline.tsx b/web/src/domains/swarm/TaskPipeline.tsx new file mode 100644 index 0000000..2c973b5 --- /dev/null +++ b/web/src/domains/swarm/TaskPipeline.tsx @@ -0,0 +1,74 @@ +import { usePipelineSummary } from '../../stores/useSwarmStore' + +export function TaskPipeline() { + const pipeline = usePipelineSummary() + + if (!pipeline) { + return ( +
+

+ Task Pipeline +

+
+ No dispatch data available +
+
+ ) + } + + const stages = [ + { label: 'Pending', value: pipeline.pending, color: 'bg-gray-500' }, + { label: 'In Progress', value: pipeline.inProgress, color: 'bg-blue-500' }, + { label: 'Completed', value: pipeline.completed, color: 'bg-green-500' }, + { label: 'Failed', value: pipeline.failed, color: 'bg-red-500' } + ] + + const total = pipeline.total || 1 // avoid division by zero + + return ( +
+

+ Task Pipeline +

+ + {/* Progress bar */} +
+ {stages.map((stage) => { + const width = (stage.value / total) * 100 + if (width === 0) return null + return ( +
+ ) + })} +
+ + {/* Stage counts */} +
+ {stages.map((stage) => ( +
+ + {stage.label} + + {stage.value} + +
+ ))} +
+ + {/* DLQ */} + {pipeline.dlqDepth > 0 && ( +
+
+ Dead Letter Queue + {pipeline.dlqDepth} +
+
+ )} +
+ ) +} diff --git a/web/src/hooks/useSwarmPolling.ts b/web/src/hooks/useSwarmPolling.ts new file mode 100644 index 0000000..b58f95c --- /dev/null +++ b/web/src/hooks/useSwarmPolling.ts @@ -0,0 +1,60 @@ +import { useEffect, useRef } from 'react' +import { useSwarmStore, fetchSwarmOverview } from '../stores/useSwarmStore' + +const POLL_INTERVAL = 10_000 // 10 seconds + +interface UseSwarmPollingOptions { + enabled: boolean +} + +export function useSwarmPolling({ enabled }: UseSwarmPollingOptions) { + const intervalRef = useRef(null) + const mountedRef = useRef(true) + + useEffect(() => { + mountedRef.current = true + + if (!enabled) { + if (intervalRef.current) { + clearInterval(intervalRef.current) + intervalRef.current = null + } + return + } + + const poll = async () => { + if (!mountedRef.current) return + + const store = useSwarmStore.getState() + // Only set loading on first fetch + if (!store.lastFetched) { + store.setLoading(true) + } + + try { + const data = await fetchSwarmOverview() + if (mountedRef.current) { + store.setOverview(data) + } + } catch (err) { + if (mountedRef.current) { + store.setError(err instanceof Error ? err.message : 'Failed to fetch swarm overview') + } + } + } + + // Fetch immediately + poll() + + // Then poll on interval + intervalRef.current = window.setInterval(poll, POLL_INTERVAL) + + return () => { + mountedRef.current = false + if (intervalRef.current) { + clearInterval(intervalRef.current) + intervalRef.current = null + } + } + }, [enabled]) +} diff --git a/web/src/hooks/useWarrenSSE.ts b/web/src/hooks/useWarrenSSE.ts new file mode 100644 index 0000000..fa0f2bc --- /dev/null +++ b/web/src/hooks/useWarrenSSE.ts @@ -0,0 +1,80 @@ +import { useEffect, useRef } from 'react' +import { useSwarmStore } from '../stores/useSwarmStore' +import type { WarrenSSEEvent } from '../types/swarm' + +const MAX_RECONNECT_DELAY = 30_000 +const INITIAL_RECONNECT_DELAY = 1_000 + +export function useWarrenSSE() { + const esRef = useRef(null) + const reconnectTimeoutRef = useRef(null) + const attemptsRef = useRef(0) + + useEffect(() => { + let mounted = true + + function connect() { + if (!mounted) return + if (esRef.current) { + esRef.current.close() + } + + const es = new EventSource('/api/swarm/warren/events') + esRef.current = es + + es.onopen = () => { + attemptsRef.current = 0 + } + + es.onmessage = (e) => { + if (!mounted) return + try { + const parsed = JSON.parse(e.data) + const event: WarrenSSEEvent = { + id: parsed.id || e.lastEventId || undefined, + type: parsed.type || 'unknown', + agent: parsed.agent || parsed.agent_id, + data: parsed.data ?? parsed, + timestamp: parsed.timestamp || Date.now() + } + useSwarmStore.getState().addEvent(event) + } catch { + // Non-JSON SSE data — wrap as raw event + useSwarmStore.getState().addEvent({ + type: 'raw', + data: e.data, + timestamp: Date.now() + }) + } + } + + es.onerror = () => { + if (!mounted) return + es.close() + esRef.current = null + + // Exponential backoff + const delay = Math.min( + INITIAL_RECONNECT_DELAY * Math.pow(2, attemptsRef.current), + MAX_RECONNECT_DELAY + ) + attemptsRef.current++ + + reconnectTimeoutRef.current = window.setTimeout(connect, delay) + } + } + + connect() + + return () => { + mounted = false + if (reconnectTimeoutRef.current) { + clearTimeout(reconnectTimeoutRef.current) + } + if (esRef.current) { + esRef.current.close() + esRef.current = null + } + } + }, []) +} diff --git a/web/src/stores/useSwarmStore.test.ts b/web/src/stores/useSwarmStore.test.ts new file mode 100644 index 0000000..6a9f149 --- /dev/null +++ b/web/src/stores/useSwarmStore.test.ts @@ -0,0 +1,279 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest' +import { useSwarmStore } from './useSwarmStore' +import type { SwarmOverview, WarrenSSEEvent } from '../types/swarm' + +// Helper to build a minimal valid overview. +function makeOverview(overrides: Partial = {}): SwarmOverview { + return { + errors: {}, + fetched_at: new Date().toISOString(), + ...overrides + } +} + +describe('useSwarmStore', () => { + beforeEach(() => { + useSwarmStore.setState({ + overview: null, + events: [], + alerts: [], + loading: false, + error: null, + lastFetched: null + }) + }) + + describe('setOverview', () => { + it('should set overview and update lastFetched', () => { + const data = makeOverview({ + warren: { health: { status: 'ok' }, agents: [] } + }) + + useSwarmStore.getState().setOverview(data) + + const state = useSwarmStore.getState() + expect(state.overview).toEqual(data) + expect(state.lastFetched).toBeTypeOf('number') + expect(state.loading).toBe(false) + expect(state.error).toBeNull() + }) + }) + + describe('addEvent', () => { + it('should prepend events in newest-first order', () => { + const event1: WarrenSSEEvent = { type: 'heartbeat', timestamp: 1 } + const event2: WarrenSSEEvent = { type: 'agent_ready', timestamp: 2 } + + useSwarmStore.getState().addEvent(event1) + useSwarmStore.getState().addEvent(event2) + + const events = useSwarmStore.getState().events + expect(events).toHaveLength(2) + expect(events[0].type).toBe('agent_ready') + expect(events[1].type).toBe('heartbeat') + }) + + it('should cap events at 100', () => { + for (let i = 0; i < 110; i++) { + useSwarmStore.getState().addEvent({ type: `event-${i}`, timestamp: i }) + } + + expect(useSwarmStore.getState().events).toHaveLength(100) + // Most recent should be first. + expect(useSwarmStore.getState().events[0].type).toBe('event-109') + }) + }) + + describe('Alert derivation from errors', () => { + it('should generate critical alert from errors map', () => { + const data = makeOverview({ + errors: { dispatch: 'connection refused' } + }) + + useSwarmStore.getState().setOverview(data) + + const alerts = useSwarmStore.getState().alerts + expect(alerts).toHaveLength(1) + expect(alerts[0].level).toBe('critical') + expect(alerts[0].service).toBe('dispatch') + expect(alerts[0].message).toContain('dispatch') + expect(alerts[0].message).toContain('unreachable') + }) + + it('should not re-alert on the same error if already seen', () => { + const data = makeOverview({ + errors: { dispatch: 'connection refused' } + }) + + useSwarmStore.getState().setOverview(data) + // Set again with same error. + useSwarmStore.getState().setOverview(data) + + // Should only have 1 alert, not 2. + expect(useSwarmStore.getState().alerts).toHaveLength(1) + }) + }) + + describe('DLQ spike detection', () => { + it('should generate warning alert on DLQ spike', () => { + // First set with low DLQ. + const baseline = makeOverview({ + chronicle: { dlq: { depth: 5 } } + }) + useSwarmStore.getState().setOverview(baseline) + + // Spike: depth > 10 and > 2x previous. + const spike = makeOverview({ + chronicle: { dlq: { depth: 50 } } + }) + useSwarmStore.getState().setOverview(spike) + + const alerts = useSwarmStore.getState().alerts + const dlqAlert = alerts.find((a) => a.id.startsWith('dlq-spike')) + expect(dlqAlert).toBeDefined() + expect(dlqAlert!.level).toBe('warning') + expect(dlqAlert!.message).toContain('50') + }) + + it('should not alert when DLQ is below threshold', () => { + const data = makeOverview({ + chronicle: { dlq: { depth: 5 } } + }) + useSwarmStore.getState().setOverview(data) + + // Small increase, still under 10. + const small = makeOverview({ + chronicle: { dlq: { depth: 8 } } + }) + useSwarmStore.getState().setOverview(small) + + const alerts = useSwarmStore.getState().alerts + const dlqAlert = alerts.find((a) => a.id.startsWith('dlq-spike')) + expect(dlqAlert).toBeUndefined() + }) + }) + + describe('dismissAlert', () => { + it('should remove alert by id', () => { + useSwarmStore.getState().addAlert({ + id: 'test-1', + level: 'warning', + service: 'chronicle', + message: 'Test alert', + timestamp: Date.now() + }) + + expect(useSwarmStore.getState().alerts).toHaveLength(1) + + useSwarmStore.getState().dismissAlert('test-1') + + expect(useSwarmStore.getState().alerts).toHaveLength(0) + }) + + it('should not affect other alerts', () => { + useSwarmStore.getState().addAlert({ + id: 'keep', + level: 'info', + service: 'warren', + message: 'Keep me', + timestamp: Date.now() + }) + useSwarmStore.getState().addAlert({ + id: 'remove', + level: 'critical', + service: 'dispatch', + message: 'Remove me', + timestamp: Date.now() + }) + + useSwarmStore.getState().dismissAlert('remove') + + const alerts = useSwarmStore.getState().alerts + expect(alerts).toHaveLength(1) + expect(alerts[0].id).toBe('keep') + }) + }) + + describe('setLoading / setError', () => { + it('should set loading state', () => { + useSwarmStore.getState().setLoading(true) + expect(useSwarmStore.getState().loading).toBe(true) + + useSwarmStore.getState().setLoading(false) + expect(useSwarmStore.getState().loading).toBe(false) + }) + + it('should set error and clear loading', () => { + useSwarmStore.getState().setLoading(true) + useSwarmStore.getState().setError('Network failure') + + expect(useSwarmStore.getState().error).toBe('Network failure') + expect(useSwarmStore.getState().loading).toBe(false) + }) + + it('should clear error', () => { + useSwarmStore.getState().setError('Something') + useSwarmStore.getState().setError(null) + + expect(useSwarmStore.getState().error).toBeNull() + }) + }) + + describe('useFleetSummary (via getState)', () => { + it('should return null when no overview', () => { + // useFleetSummary is a hook — test the logic directly. + const overview = useSwarmStore.getState().overview + expect(overview).toBeNull() + }) + + it('should compute fleet counts from overview data', () => { + const data = makeOverview({ + warren: { + agents: [ + { id: 'a1', name: 'Agent1', state: 'ready' }, + { id: 'a2', name: 'Agent2', state: 'sleeping' }, + { id: 'a3', name: 'Agent3', state: 'ready' } + ] + }, + dispatch: { + stats: { pending: 3, in_progress: 7, completed: 20, failed: 1, total: 31 }, + agents: [{ id: 'd1', status: 'active' }] + }, + chronicle: { dlq: { depth: 4 } }, + promptforge: { prompt_count: 15 }, + alexandria: { collection_count: 8 }, + errors: { someservice: 'down' } + }) + + useSwarmStore.getState().setOverview(data) + const overview = useSwarmStore.getState().overview! + + // Replicate useFleetSummary logic. + const warrenAgents = overview.warren?.agents ?? [] + const dispatchAgents = overview.dispatch?.agents ?? [] + const totalAgents = warrenAgents.length + dispatchAgents.length + const readyCount = warrenAgents.filter((a) => a.state === 'ready').length + const sleepingCount = warrenAgents.filter((a) => a.state === 'sleeping').length + const degradedCount = Object.keys(overview.errors).length + + expect(totalAgents).toBe(4) // 3 warren + 1 dispatch + expect(readyCount).toBe(2) + expect(sleepingCount).toBe(1) + expect(degradedCount).toBe(1) + expect(overview.dispatch?.stats?.in_progress).toBe(7) + expect(overview.chronicle?.dlq?.depth).toBe(4) + expect(overview.promptforge?.prompt_count).toBe(15) + expect(overview.alexandria?.collection_count).toBe(8) + }) + }) + + describe('usePipelineSummary (via getState)', () => { + it('should compute pipeline values from dispatch stats', () => { + const data = makeOverview({ + dispatch: { + stats: { pending: 5, in_progress: 3, completed: 42, failed: 2, total: 52 } + }, + chronicle: { dlq: { depth: 7 } } + }) + + useSwarmStore.getState().setOverview(data) + const overview = useSwarmStore.getState().overview! + + const stats = overview.dispatch!.stats! + expect(stats.pending).toBe(5) + expect(stats.in_progress).toBe(3) + expect(stats.completed).toBe(42) + expect(stats.failed).toBe(2) + expect(stats.total).toBe(52) + expect(overview.chronicle?.dlq?.depth).toBe(7) + }) + + it('should handle missing dispatch stats', () => { + const data = makeOverview() + useSwarmStore.getState().setOverview(data) + + const overview = useSwarmStore.getState().overview! + expect(overview.dispatch?.stats).toBeUndefined() + }) + }) +}) diff --git a/web/src/stores/useSwarmStore.ts b/web/src/stores/useSwarmStore.ts new file mode 100644 index 0000000..a4895e0 --- /dev/null +++ b/web/src/stores/useSwarmStore.ts @@ -0,0 +1,149 @@ +import { create } from 'zustand' +import type { + SwarmOverview, + WarrenSSEEvent, + SwarmAlert +} from '../types/swarm' + +const MAX_EVENTS = 100 +const MAX_ALERTS = 50 + +interface SwarmState { + // State + overview: SwarmOverview | null + events: WarrenSSEEvent[] + alerts: SwarmAlert[] + loading: boolean + error: string | null + lastFetched: number | null + + // Actions + setOverview: (data: SwarmOverview) => void + addEvent: (event: WarrenSSEEvent) => void + addAlert: (alert: SwarmAlert) => void + dismissAlert: (id: string) => void + setLoading: (loading: boolean) => void + setError: (error: string | null) => void +} + +export const useSwarmStore = create()((set, get) => ({ + overview: null, + events: [], + alerts: [], + loading: false, + error: null, + lastFetched: null, + + setOverview: (data) => { + const prev = get().overview + const newAlerts: SwarmAlert[] = [] + + // Derive alerts from errors map + for (const [service, msg] of Object.entries(data.errors)) { + // Only alert if this is a new error (wasn't in previous) + if (!prev?.errors[service]) { + newAlerts.push({ + id: `down-${service}-${Date.now()}`, + level: 'critical', + service, + message: `${service} is unreachable: ${msg}`, + timestamp: Date.now() + }) + } + } + + // DLQ spike detection + const dlqDepth = data.chronicle?.dlq?.depth ?? 0 + const prevDlqDepth = prev?.chronicle?.dlq?.depth ?? 0 + if (dlqDepth > 10 && dlqDepth > prevDlqDepth * 2) { + newAlerts.push({ + id: `dlq-spike-${Date.now()}`, + level: 'warning', + service: 'chronicle', + message: `DLQ depth spiked to ${dlqDepth}`, + timestamp: Date.now() + }) + } + + set((state) => ({ + overview: data, + loading: false, + error: null, + lastFetched: Date.now(), + alerts: [...newAlerts, ...state.alerts].slice(0, MAX_ALERTS) + })) + }, + + addEvent: (event) => set((state) => ({ + events: [event, ...state.events].slice(0, MAX_EVENTS) + })), + + addAlert: (alert) => set((state) => ({ + alerts: [alert, ...state.alerts].slice(0, MAX_ALERTS) + })), + + dismissAlert: (id) => set((state) => ({ + alerts: state.alerts.filter((a) => a.id !== id) + })), + + setLoading: (loading) => set({ loading }), + + setError: (error) => set({ error, loading: false }) +})) + +// Selectors +export const useSwarmOverview = () => useSwarmStore((s) => s.overview) +export const useSwarmEvents = () => useSwarmStore((s) => s.events) +export const useSwarmAlerts = () => useSwarmStore((s) => s.alerts) +export const useSwarmLoading = () => useSwarmStore((s) => s.loading) + +// Computed selectors +export function useFleetSummary() { + const overview = useSwarmStore((s) => s.overview) + if (!overview) return null + + const warrenAgents = overview.warren?.agents ?? [] + const dispatchAgents = overview.dispatch?.agents ?? [] + + const totalAgents = warrenAgents.length + dispatchAgents.length + const readyCount = warrenAgents.filter((a) => a.state === 'ready').length + const sleepingCount = warrenAgents.filter((a) => a.state === 'sleeping').length + const degradedCount = Object.keys(overview.errors).length + + return { + totalAgents, + readyCount, + sleepingCount, + degradedCount, + activeTasks: overview.dispatch?.stats?.in_progress ?? 0, + dlqDepth: overview.chronicle?.dlq?.depth ?? 0, + promptCount: overview.promptforge?.prompt_count ?? 0, + collectionCount: overview.alexandria?.collection_count ?? 0 + } +} + +export function usePipelineSummary() { + const overview = useSwarmStore((s) => s.overview) + if (!overview?.dispatch?.stats) return null + + const stats = overview.dispatch.stats + return { + pending: stats.pending ?? 0, + inProgress: stats.in_progress ?? 0, + completed: stats.completed ?? 0, + failed: stats.failed ?? 0, + total: stats.total ?? 0, + dlqDepth: overview.chronicle?.dlq?.depth ?? 0 + } +} + +// API +const API_BASE = '/api' + +export async function fetchSwarmOverview(): Promise { + const res = await fetch(`${API_BASE}/swarm/overview`) + if (!res.ok) { + throw new Error(await res.text()) + } + return res.json() +} diff --git a/web/src/types/swarm.ts b/web/src/types/swarm.ts new file mode 100644 index 0000000..a2a463f --- /dev/null +++ b/web/src/types/swarm.ts @@ -0,0 +1,101 @@ +// Swarm BFF response types + +export interface SwarmOverview { + warren?: WarrenData + chronicle?: ChronicleData + dispatch?: DispatchData + promptforge?: PromptForgeData + alexandria?: AlexandriaData + errors: Record + fetched_at: string +} + +export interface WarrenData { + health?: WarrenHealth + agents?: WarrenAgent[] +} + +export interface WarrenHealth { + status: string + uptime?: number + version?: string + agents_connected?: number +} + +export interface WarrenAgent { + id: string + name: string + state: string // ready, sleeping, starting, stopping + connections?: number + policy?: string + started_at?: string +} + +export interface ChronicleData { + metrics?: ChronicleMetrics + dlq?: DLQStats +} + +export interface ChronicleMetrics { + total_events?: number + events_per_minute?: number + error_rate?: number + [key: string]: unknown +} + +export interface DLQStats { + depth?: number + oldest_age_seconds?: number + processing_rate?: number + [key: string]: unknown +} + +export interface DispatchData { + stats?: DispatchStats + agents?: DispatchAgent[] +} + +export interface DispatchStats { + pending?: number + in_progress?: number + completed?: number + failed?: number + total?: number + [key: string]: unknown +} + +export interface DispatchAgent { + id: string + name?: string + status: string + current_task?: string + tasks_completed?: number +} + +export interface PromptForgeData { + prompt_count?: number + prompts?: unknown +} + +export interface AlexandriaData { + collection_count?: number + collections?: unknown +} + +// SSE event from Warren +export interface WarrenSSEEvent { + id?: string + type: string + agent?: string + data?: unknown + timestamp: number +} + +// Derived alert from cross-service data +export interface SwarmAlert { + id: string + level: 'info' | 'warning' | 'critical' + service: string + message: string + timestamp: number +}