diff --git a/README.md b/README.md index 8b96b24..3eb4881 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ Inspired by [lazygit](https://github.com/jesseduffield/lazygit), [lazyworktree]( - **[`lazyagent compact`](docs/maintenance/compact.md)** — shrink session files in place by truncating bulky tool outputs, thinking blocks, and embedded images — sessions stay resumable with the originating agent. Supports Claude Code, pi, and Codex. - **[`lazyagent search`](docs/maintenance/search.md)** — search transcript-file agents (Claude, Codex, pi, Amp) with highlighted snippets and an incremental local index. - **[`lazyagent limits`](docs/maintenance/limits.md)** — on-demand 5-hour and weekly rate-limit snapshot for Claude Code and Codex, with a pace indicator that flags whether you're under-, on-, or over-utilizing the window. +- **Outbound webhooks on session state transitions** — send a signed JSON payload to Slack, a custom dashboard, or a CI endpoint whenever a session goes idle, waits for input, or changes state. See [Webhooks](docs/reference/webhooks.md). Typical savings on a year of daily use: **80+ MiB reclaimed** across a few commands, with every rewrite validated and backed up by default. diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index 98175db..c72ea9d 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -130,6 +130,26 @@ Default: `"dark"`. Supported values: All TUI colors (panels, activity labels, help bar, overlays) are driven by the theme. +### `webhooks` + +Default: `[]` (empty — no outbound webhooks). A list of HTTP endpoints that receive a POST whenever a session changes activity state. Each entry can filter by event type and agent source, and optionally sign requests with HMAC-SHA256. + +```json +{ + "webhooks": [ + { + "name": "slack-needs-input", + "url": "https://hooks.slack.com/services/T00/B00/XXX", + "secret": "abc123", + "events": ["waiting"], + "agents": ["claude"] + } + ] +} +``` + +See [Outbound Webhooks](webhooks.md) for the full field reference, payload schema, request headers, HMAC verification, delivery semantics, and troubleshooting tips. + ## Where the config file lives | OS | Path | diff --git a/docs/reference/roadmap.md b/docs/reference/roadmap.md index 20a2f7c..14cd002 100644 --- a/docs/reference/roadmap.md +++ b/docs/reference/roadmap.md @@ -152,9 +152,17 @@ sidebar: - ✅ Codex via the latest rollout JSONL under `~/.codex/sessions/` — no network call, fallback to older rollouts when the most recent has no `rate_limits` event yet - ✅ Honest User-Agent (no Claude Code impersonation), graceful failure on 401/429, disclaimer in `--help` and output +## v0.10 — Outbound webhooks + +- ✅ Typed `core.EventBus` for in-process pub-sub of activity transitions +- ✅ `internal/webhook/` dispatcher with async best-effort delivery +- ✅ Event + agent filters per webhook +- ✅ Optional HMAC-SHA256 signing (GitHub-style header) +- ✅ Async fan-out with bounded queue, retry on transient failures, dedup window for duplicate transitions across in-process managers +- ✅ Documentation with payload schema and verification example + ## Future ideas -- ⬜ Outbound webhooks on status changes - ⬜ Multi-machine support via shared config / remote API - ⬜ TUI actions: kill session, attach terminal - ⬜ Session history browser (browse past conversations) diff --git a/docs/reference/webhooks.md b/docs/reference/webhooks.md new file mode 100644 index 0000000..251c071 --- /dev/null +++ b/docs/reference/webhooks.md @@ -0,0 +1,120 @@ +--- +title: "Outbound Webhooks" +description: "Send session state transitions to Slack, dashboards, or CI pipelines via HTTP POST." +sidebar: + order: 3 +--- + +Outbound webhooks let lazyagent push a JSON payload to any HTTP endpoint whenever a session changes activity state. Common uses include posting to a Slack channel when an agent is waiting for input, feeding a custom dashboard, or triggering a CI step when a long-running session goes idle. + +## Configuration + +Add a `webhooks` array to `~/.config/lazyagent/config.json`: + +```json +{ + "webhooks": [ + { + "name": "slack-needs-input", + "url": "https://hooks.slack.com/services/T00/B00/XXX", + "secret": "abc123sharedwithslack", + "events": ["waiting"], + "agents": ["claude", "codex"] + }, + { + "name": "dashboard-everything", + "url": "https://my-dashboard.local/api/lazyagent" + } + ] +} +``` + +The first entry fires only when a Claude Code or Codex session enters the `waiting` state, and signs each request with an HMAC-SHA256 header. The second entry receives every transition from every agent, unsigned. + +## Field reference + +| Field | Type | Required | Description | +|---|---|---|---| +| `name` | string | yes | Human-readable identifier used in log lines. | +| `url` | string | yes | Destination endpoint. `http://` and `https://` are both accepted. | +| `secret` | string | no | When set, each request carries an `X-Lazyagent-Signature` header (see [HMAC verification](#hmac-verification)). | +| `events` | string array | no | Activity kinds to deliver. Empty or absent means all events. Valid values: `idle`, `waiting`, `thinking`, `compacting`, `reading`, `writing`, `running`, `searching`, `browsing`, `spawning`. | +| `agents` | string array | no | Agent sources to deliver. Empty or absent means all agents. Valid values: `claude`, `codex`, `pi`, `cursor`, `amp`, `opencode`. | +| `enabled` | boolean | no | Defaults to `true`. Set to `false` to disable the entry without removing it. | + +## Payload schema + +Every delivery is an HTTP POST with a JSON body: + +```json +{ + "id": "f47ac10b-58cc-4372-a567-0e02b2c3d479", + "event": "state_transition", + "session_id": "abc123", + "agent": "claude", + "from": "idle", + "to": "waiting", + "project_path": "/Users/foo/code/bar", + "timestamp": "2026-05-19T14:30:00Z", + "api": { + "session_url": "http://127.0.0.1:7421/api/sessions/abc123" + } +} +``` + +The `api` object is included on a best-effort basis. It is present when the +webhook dispatcher and the API server run in the same process — typically +`--tui --api` (or `--api` alone). When the GUI tray is involved +(`--gui`, `--gui --api`, `--tui --gui --api`), the tray process owns +webhook delivery while the API server lives in the parent process, so the +two are not linked and `api` is omitted from payloads. Consumers should +treat `api` as optional and not rely on its presence. + +## Request headers + +| Header | Value | +|---|---| +| `Content-Type` | `application/json` | +| `User-Agent` | `lazyagent/` | +| `X-Lazyagent-Event` | `state_transition` | +| `X-Lazyagent-Delivery` | UUID matching the `id` field in the body | +| `X-Lazyagent-Signature` | `sha256=` (only when `secret` is configured) | + +## HMAC verification + +When `secret` is set, the signature is computed over the raw request body using HMAC-SHA256. Verify it on the receiving side before trusting the payload: + +```python +import hmac, hashlib + +secret = b"abc123sharedwithslack" +body = request.get_data() +sig = "sha256=" + hmac.new(secret, body, hashlib.sha256).hexdigest() +if not hmac.compare_digest(sig, request.headers["X-Lazyagent-Signature"]): + abort(401) +``` + +Always use a constant-time comparison (`hmac.compare_digest` or equivalent) to avoid timing attacks. + +## Delivery semantics + +- **Asynchronous, best-effort.** Webhooks are dispatched in the background and never block session monitoring. +- **Bounded queue.** Each dispatcher holds up to 256 pending deliveries. If the queue is full, new events are dropped and a log line is emitted. +- **Retry on transient failures.** HTTP 5xx responses and network errors trigger exponential backoff: 1 s, 5 s, 30 s. Maximum 4 attempts total. +- **No retry on 4xx.** Client errors (wrong URL, bad auth, malformed payload on the consumer side) are logged with the status code and a body snippet, then discarded. +- **Dedup window.** Duplicate transitions within 2 seconds are coalesced. This prevents double-delivery when multiple in-process managers (e.g. `--tui` and `--gui` running together) each observe the same transition. +- **`api.*` URLs.** Present only when `--api` is active, the server is bound, and the dispatcher and API server share the same process; absent otherwise (see note above the payload schema). + +## Troubleshooting + +**`api.session_url` is missing in `--gui --api` mode.** +This is expected: the tray process delivers webhooks while the parent process runs the API server, and the two are not cross-linked. Use `--tui --api` if you need the backlink in the payload. + +**I see no POSTs.** +Verify that the `webhooks` array is non-empty and well-formed JSON. lazyagent logs invalid webhook entries on startup with a line like `config: webhook "name": ...`. Also confirm the `events` and `agents` filters match what you expect. + +**I see duplicate deliveries.** +Check whether you are running more than one lazyagent process simultaneously (e.g. `--tui` in one terminal and `--gui` in the background). Each process has its own dispatcher and can emit independent POSTs for the same transition. The 2-second dedup window covers duplicate detection within a single process only. + +**4xx errors appear in the log.** +The consumer is rejecting the request. lazyagent does not retry 4xx responses by design — fix the consumer endpoint (URL, auth headers, expected payload shape) and the next transition will deliver cleanly. diff --git a/docs/superpowers/plans/2026-05-19-outbound-webhooks.md b/docs/superpowers/plans/2026-05-19-outbound-webhooks.md new file mode 100644 index 0000000..81b67c9 --- /dev/null +++ b/docs/superpowers/plans/2026-05-19-outbound-webhooks.md @@ -0,0 +1,2078 @@ +# Outbound Webhooks Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add outbound HTTP webhooks that fire on session activity state transitions, with per-webhook event/agent filters, optional HMAC-SHA256 signing, and async best-effort delivery. + +**Architecture:** New typed `core.EventBus` published from `ActivityTracker.Update` when previous activity differs from new; new `internal/webhook/` package subscribes to the bus, applies per-webhook filters, and delivers via a bounded-queue worker pool with retry. The dispatcher dedupes duplicate transitions from multiple in-process managers within a short window. + +**Tech Stack:** Go 1.21+, stdlib only (`net/http`, `crypto/hmac`, `crypto/sha256`, `encoding/json`, `context`, `sync`). No new dependencies. + +**Spec:** `docs/superpowers/specs/2026-05-19-outbound-webhooks-design.md` + +--- + +## File Map + +**Create:** +- `internal/core/eventbus.go` — `EventBus`, `SessionEvent` +- `internal/core/eventbus_test.go` +- `internal/webhook/dispatcher.go` — `Dispatcher`, `ConfigSource`, lifecycle +- `internal/webhook/dispatcher_test.go` +- `internal/webhook/payload.go` — `Payload`, marshal helper +- `internal/webhook/payload_test.go` +- `internal/webhook/filter.go` — `Matches(WebhookConfig, SessionEvent) bool` +- `internal/webhook/filter_test.go` +- `internal/webhook/hmac.go` — `Sign(secret, body []byte) string` +- `internal/webhook/hmac_test.go` +- `docs/reference/webhooks.md` — user-facing reference page + +**Modify:** +- `internal/core/activity.go` — `ActivityTracker` gains `bus *EventBus` + `SetEventBus`; `Update` emits transitions +- `internal/core/activity_test.go` — new test cases +- `internal/core/config.go` — `WebhookConfig` type, `Webhooks []WebhookConfig` on `Config`, validation +- `internal/core/config_test.go` — validation tests +- `internal/core/session.go` — `SetEventBus` on `SessionManager`, propagates to tracker +- `internal/ui/app.go` — accept and wire bus +- `internal/tray/service.go` — accept and wire bus (guarded by `!notray`) +- `internal/api/server.go` — accept and wire bus +- `main.go` — construct bus, wire to managers, start dispatcher when `cfg.Webhooks` non-empty +- `docs/reference/configuration.md` — document `webhooks` field +- `docs/reference/roadmap.md` — add `v0.10 — Outbound webhooks` section (or leave to upstream merger) +- `README.md` — one-line mention under features + +--- + +## Task 1: EventBus core type + +**Files:** +- Create: `internal/core/eventbus.go` +- Test: `internal/core/eventbus_test.go` + +- [ ] **Step 1: Write the failing tests** + +```go +// internal/core/eventbus_test.go +package core + +import ( + "sync" + "testing" + "time" +) + +func TestEventBus_PublishSubscribe(t *testing.T) { + bus := NewEventBus() + ch := bus.Subscribe(4) + defer bus.Unsubscribe(ch) + + want := SessionEvent{SessionID: "s1", From: ActivityIdle, To: ActivityThinking, At: time.Unix(0, 0)} + bus.Publish(want) + + select { + case got := <-ch: + if got != want { + t.Fatalf("got %+v, want %+v", got, want) + } + case <-time.After(time.Second): + t.Fatal("subscriber did not receive event") + } +} + +func TestEventBus_DropOnFullSubscriber(t *testing.T) { + bus := NewEventBus() + ch := bus.Subscribe(1) + defer bus.Unsubscribe(ch) + + bus.Publish(SessionEvent{SessionID: "a"}) + bus.Publish(SessionEvent{SessionID: "b"}) // dropped + bus.Publish(SessionEvent{SessionID: "c"}) // dropped + + got := <-ch + if got.SessionID != "a" { + t.Fatalf("got %q, want %q", got.SessionID, "a") + } + select { + case extra := <-ch: + t.Fatalf("unexpected extra event: %+v", extra) + case <-time.After(50 * time.Millisecond): + } +} + +func TestEventBus_UnsubscribeIdempotent(t *testing.T) { + bus := NewEventBus() + ch := bus.Subscribe(1) + bus.Unsubscribe(ch) + bus.Unsubscribe(ch) // must not panic + + // Publish after unsubscribe must not block or send to the closed channel. + done := make(chan struct{}) + go func() { + bus.Publish(SessionEvent{SessionID: "x"}) + close(done) + }() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("Publish after Unsubscribe blocked") + } +} + +func TestEventBus_ConcurrentPublish(t *testing.T) { + bus := NewEventBus() + ch := bus.Subscribe(1024) + defer bus.Unsubscribe(ch) + + var wg sync.WaitGroup + const n = 100 + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + bus.Publish(SessionEvent{SessionID: "x"}) + }(i) + } + wg.Wait() + + count := 0 + for { + select { + case <-ch: + count++ + case <-time.After(50 * time.Millisecond): + if count != n { + t.Fatalf("received %d events, want %d", count, n) + } + return + } + } +} +``` + +- [ ] **Step 2: Run tests, verify they fail** + +Run: `go test ./internal/core/ -run TestEventBus -v` +Expected: compile error — `EventBus`, `SessionEvent`, `NewEventBus` undefined. + +- [ ] **Step 3: Implement the EventBus** + +```go +// internal/core/eventbus.go +package core + +import ( + "sync" + "time" +) + +// SessionEvent is published when a session's resolved activity changes. +type SessionEvent struct { + SessionID string + Agent string + From ActivityKind + To ActivityKind + At time.Time + ProjectPath string +} + +// EventBus is a minimal typed pub-sub for in-process subscribers. +// Publish never blocks; events are dropped for subscribers whose channel is full. +type EventBus struct { + mu sync.RWMutex + subs []chan SessionEvent +} + +// NewEventBus returns a ready-to-use EventBus. +func NewEventBus() *EventBus { return &EventBus{} } + +// Subscribe registers a new subscriber and returns its channel. +// buf is the channel buffer; pick a size matching the subscriber's drain rate. +func (b *EventBus) Subscribe(buf int) <-chan SessionEvent { + if buf < 1 { + buf = 1 + } + ch := make(chan SessionEvent, buf) + b.mu.Lock() + b.subs = append(b.subs, ch) + b.mu.Unlock() + return ch +} + +// Unsubscribe removes the channel from the bus. Safe to call multiple times. +// The caller must not read from the channel after Unsubscribe returns. +func (b *EventBus) Unsubscribe(ch <-chan SessionEvent) { + b.mu.Lock() + defer b.mu.Unlock() + for i, sub := range b.subs { + if sub == ch { + b.subs = append(b.subs[:i], b.subs[i+1:]...) + return + } + } +} + +// Publish sends e to every subscriber. Non-blocking: subscribers whose channel +// is full miss this event. +func (b *EventBus) Publish(e SessionEvent) { + b.mu.RLock() + subs := b.subs + b.mu.RUnlock() + for _, ch := range subs { + select { + case ch <- e: + default: + // dropped; subscribers are responsible for keeping up + } + } +} +``` + +- [ ] **Step 4: Run tests, verify they pass** + +Run: `go test ./internal/core/ -run TestEventBus -race -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add internal/core/eventbus.go internal/core/eventbus_test.go +git commit -m "feat(core): add typed EventBus for in-process pub-sub" +``` + +--- + +## Task 2: ActivityTracker emits transitions + +**Files:** +- Modify: `internal/core/activity.go` +- Modify: `internal/core/activity_test.go` + +- [ ] **Step 1: Write failing tests** + +Append to `internal/core/activity_test.go`: + +```go +func TestActivityTracker_EmitsTransitionOnChange(t *testing.T) { + bus := NewEventBus() + ch := bus.Subscribe(8) + defer bus.Unsubscribe(ch) + + tr := NewActivityTracker() + tr.SetEventBus(bus) + + now := time.Now() + s := &model.Session{SessionID: "s1", Agent: "claude", CWD: "/p", LastActivity: now, Status: model.StatusThinking} + tr.Update([]*model.Session{s}, now) + + // First Update: new session emits Unknown→Thinking. + select { + case ev := <-ch: + if ev.SessionID != "s1" || ev.Agent != "claude" || ev.From != "" || ev.To != ActivityThinking || ev.ProjectPath != "/p" { + t.Fatalf("unexpected event: %+v", ev) + } + case <-time.After(time.Second): + t.Fatal("no event emitted") + } + + // Same activity: no event. + tr.Update([]*model.Session{s}, now) + select { + case ev := <-ch: + t.Fatalf("unexpected event on unchanged state: %+v", ev) + case <-time.After(50 * time.Millisecond): + } + + // Status flips to waiting (after grace). + s.Status = model.StatusWaitingForUser + tr.Update([]*model.Session{s}, now.Add(WaitingGrace+time.Second)) + select { + case ev := <-ch: + if ev.From != ActivityThinking || ev.To != ActivityWaiting { + t.Fatalf("unexpected transition: %+v", ev) + } + case <-time.After(time.Second): + t.Fatal("no event on change") + } +} + +func TestActivityTracker_NilBusSafe(t *testing.T) { + tr := NewActivityTracker() + // No SetEventBus call. Must not panic. + s := &model.Session{SessionID: "s1", Agent: "claude", LastActivity: time.Now(), Status: model.StatusThinking} + tr.Update([]*model.Session{s}, time.Now()) +} +``` + +- [ ] **Step 2: Run tests, verify they fail** + +Run: `go test ./internal/core/ -run TestActivityTracker_ -v` +Expected: compile error — `SetEventBus` undefined, plus the new test cases fail. + +- [ ] **Step 3: Modify ActivityTracker to emit transitions** + +Edit `internal/core/activity.go`: + +Replace the `ActivityTracker` struct and `NewActivityTracker`: + +```go +// ActivityTracker manages sticky activity states with grace period logic. +// When an EventBus is attached, transitions are published on Update. +type ActivityTracker struct { + activities map[string]*ActivityEntry + waitingSince map[string]time.Time + bus *EventBus // optional, nil-safe + agents map[string]string // session_id → agent name (for events) + projects map[string]string // session_id → CWD (for events) +} + +// NewActivityTracker creates a new ActivityTracker. +func NewActivityTracker() *ActivityTracker { + return &ActivityTracker{ + activities: make(map[string]*ActivityEntry), + waitingSince: make(map[string]time.Time), + agents: make(map[string]string), + projects: make(map[string]string), + } +} + +// SetEventBus attaches a bus so Update will publish transition events. +// Passing nil clears any previously attached bus. +func (t *ActivityTracker) SetEventBus(bus *EventBus) { + t.bus = bus +} +``` + +Then replace the `Update` method body — the new logic compares previous vs new and publishes: + +```go +// Update resolves and stores the current activity for each session. +// Applies a grace period before showing ActivityWaiting to avoid false positives. +// If an EventBus is attached, transitions are published. +func (t *ActivityTracker) Update(sessions []*model.Session, now time.Time) { + activeIDs := make(map[string]struct{}, len(sessions)) + for _, s := range sessions { + id := s.SessionID + if id == "" { + continue + } + activeIDs[id] = struct{}{} + activity := ResolveActivity(s, now) + + if activity == ActivityWaiting { + if _, seen := t.waitingSince[id]; !seen { + t.waitingSince[id] = now + } + if now.Sub(t.waitingSince[id]) < WaitingGrace { + continue + } + } else { + delete(t.waitingSince, id) + } + + var prev ActivityKind + if e, ok := t.activities[id]; ok { + prev = e.Kind + } + t.activities[id] = &ActivityEntry{Kind: activity, LastSeen: now} + t.agents[id] = s.Agent + t.projects[id] = s.CWD + + if t.bus != nil && prev != activity { + t.bus.Publish(SessionEvent{ + SessionID: id, + Agent: s.Agent, + From: prev, + To: activity, + At: now, + ProjectPath: s.CWD, + }) + } + } + for id := range t.activities { + if _, ok := activeIDs[id]; !ok { + delete(t.activities, id) + delete(t.waitingSince, id) + delete(t.agents, id) + delete(t.projects, id) + } + } +} +``` + +- [ ] **Step 4: Run tests, verify they pass** + +Run: `go test ./internal/core/ -race -v` +Expected: all PASS (including pre-existing tracker tests). + +- [ ] **Step 5: Commit** + +```bash +git add internal/core/activity.go internal/core/activity_test.go +git commit -m "feat(core): publish SessionEvent on activity transitions" +``` + +--- + +## Task 3: SessionManager wires bus into tracker + +**Files:** +- Modify: `internal/core/session.go` + +- [ ] **Step 1: Write failing test** + +Append to `internal/core/session_test.go` (create if missing): + +```go +package core + +import ( + "testing" + "time" + + "github.com/illegalstudio/lazyagent/internal/model" +) + +type stubProvider struct{ sessions []*model.Session } + +func (p stubProvider) DiscoverSessions() ([]*model.Session, error) { return p.sessions, nil } +func (p stubProvider) UseWatcher() bool { return false } +func (p stubProvider) RefreshInterval() time.Duration { return 0 } +func (p stubProvider) WatchDirs() []string { return nil } + +func TestSessionManager_SetEventBus_PropagatesToTracker(t *testing.T) { + bus := NewEventBus() + ch := bus.Subscribe(4) + defer bus.Unsubscribe(ch) + + now := time.Now() + p := stubProvider{sessions: []*model.Session{{SessionID: "s1", Agent: "claude", LastActivity: now, Status: model.StatusThinking}}} + m := NewSessionManager(60, p) + m.SetEventBus(bus) + + if err := m.Reload(); err != nil { + t.Fatalf("Reload: %v", err) + } + + select { + case ev := <-ch: + if ev.SessionID != "s1" || ev.To != ActivityThinking { + t.Fatalf("unexpected event: %+v", ev) + } + case <-time.After(time.Second): + t.Fatal("no event after Reload") + } +} +``` + +- [ ] **Step 2: Run test, verify it fails** + +Run: `go test ./internal/core/ -run TestSessionManager_SetEventBus -v` +Expected: compile error — `SetEventBus` undefined on `*SessionManager`. + +- [ ] **Step 3: Add SetEventBus to SessionManager** + +In `internal/core/session.go`, add the method (near `SetExcludeCWDSubstrings`): + +```go +// SetEventBus attaches an event bus so activity transitions are published +// to subscribers. Pass nil to detach. +func (m *SessionManager) SetEventBus(bus *EventBus) { + m.mu.Lock() + m.tracker.SetEventBus(bus) + m.mu.Unlock() +} +``` + +- [ ] **Step 4: Run tests, verify they pass** + +Run: `go test ./internal/core/ -race -v` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add internal/core/session.go internal/core/session_test.go +git commit -m "feat(core): SessionManager.SetEventBus propagates bus to tracker" +``` + +--- + +## Task 4: WebhookConfig type and validation + +**Files:** +- Modify: `internal/core/config.go` +- Modify: `internal/core/config_test.go` + +- [ ] **Step 1: Write failing tests** + +Append to `internal/core/config_test.go`: + +```go +func TestWebhookConfig_ValidateOK(t *testing.T) { + tr := true + w := WebhookConfig{ + Name: "slack", + URL: "https://example.com/hook", + Events: []string{"waiting"}, + Agents: []string{"claude"}, + Enabled: &tr, + } + if err := w.Validate(); err != nil { + t.Fatalf("unexpected: %v", err) + } + if !w.IsEnabled() { + t.Fatal("IsEnabled should be true") + } +} + +func TestWebhookConfig_Validate_RejectsMissingFields(t *testing.T) { + cases := []struct { + name string + w WebhookConfig + }{ + {"no name", WebhookConfig{URL: "https://x"}}, + {"no url", WebhookConfig{Name: "x"}}, + {"bad scheme", WebhookConfig{Name: "x", URL: "ftp://x"}}, + {"unparseable", WebhookConfig{Name: "x", URL: "::"}}, + {"unknown event", WebhookConfig{Name: "x", URL: "https://x", Events: []string{"nope"}}}, + {"unknown agent", WebhookConfig{Name: "x", URL: "https://x", Agents: []string{"nope"}}}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + if err := c.w.Validate(); err == nil { + t.Fatal("expected error") + } + }) + } +} + +func TestWebhookConfig_IsEnabled_DefaultTrue(t *testing.T) { + w := WebhookConfig{Name: "x", URL: "https://x"} + if !w.IsEnabled() { + t.Fatal("absent Enabled should default to true") + } +} + +func TestConfig_ValidWebhooks_SkipsInvalid(t *testing.T) { + cfg := Config{Webhooks: []WebhookConfig{ + {Name: "ok", URL: "https://x"}, + {Name: "bad", URL: "ftp://x"}, + }} + got := cfg.ValidWebhooks() + if len(got) != 1 || got[0].Name != "ok" { + t.Fatalf("got %+v, want only 'ok'", got) + } +} +``` + +- [ ] **Step 2: Run tests, verify they fail** + +Run: `go test ./internal/core/ -run "TestWebhookConfig|TestConfig_ValidWebhooks" -v` +Expected: compile error — types/methods undefined. + +- [ ] **Step 3: Add WebhookConfig and validation** + +Add to `internal/core/config.go`: + +```go +// WebhookConfig is a single outbound webhook destination. +type WebhookConfig struct { + Name string `json:"name"` + URL string `json:"url"` + Secret string `json:"secret,omitempty"` + Events []string `json:"events,omitempty"` + Agents []string `json:"agents,omitempty"` + Enabled *bool `json:"enabled,omitempty"` // absent = true +} + +// IsEnabled returns true unless Enabled is explicitly set to false. +func (w WebhookConfig) IsEnabled() bool { + return w.Enabled == nil || *w.Enabled +} + +// knownActivityNames lists the canonical activity names accepted in config. +var knownActivityNames = map[string]ActivityKind{ + "idle": ActivityIdle, + "waiting": ActivityWaiting, + "thinking": ActivityThinking, + "compacting": ActivityCompacting, + "reading": ActivityReading, + "writing": ActivityWriting, + "running": ActivityRunning, + "searching": ActivitySearching, + "browsing": ActivityBrowsing, + "spawning": ActivitySpawning, +} + +// knownAgentNames lists the agent names accepted in config. +var knownAgentNames = map[string]struct{}{ + "claude": {}, "codex": {}, "pi": {}, "cursor": {}, "amp": {}, "opencode": {}, +} + +// Validate returns nil if the webhook is well-formed. +func (w WebhookConfig) Validate() error { + if strings.TrimSpace(w.Name) == "" { + return fmt.Errorf("webhook: name is required") + } + if strings.TrimSpace(w.URL) == "" { + return fmt.Errorf("webhook %q: url is required", w.Name) + } + u, err := url.Parse(w.URL) + if err != nil { + return fmt.Errorf("webhook %q: url parse: %w", w.Name, err) + } + if u.Scheme != "http" && u.Scheme != "https" { + return fmt.Errorf("webhook %q: url scheme must be http or https, got %q", w.Name, u.Scheme) + } + for _, ev := range w.Events { + if _, ok := knownActivityNames[strings.ToLower(ev)]; !ok { + return fmt.Errorf("webhook %q: unknown event %q", w.Name, ev) + } + } + for _, ag := range w.Agents { + if _, ok := knownAgentNames[strings.ToLower(ag)]; !ok { + return fmt.Errorf("webhook %q: unknown agent %q", w.Name, ag) + } + } + return nil +} +``` + +Also add a `Webhooks []WebhookConfig` field to the `Config` struct (alphabetically placed), and the convenience helper: + +```go +// ValidWebhooks returns the subset of webhooks that pass Validate. +// Invalid webhooks are logged once at load time; this method just filters. +func (c Config) ValidWebhooks() []WebhookConfig { + out := make([]WebhookConfig, 0, len(c.Webhooks)) + for _, w := range c.Webhooks { + if err := w.Validate(); err == nil && w.IsEnabled() { + out = append(out, w) + } + } + return out +} +``` + +Add the import `"net/url"` to `internal/core/config.go`. + +The `LoadConfig` function should log each invalid webhook once at load time. Find the section that returns `cfg` after parsing and add (immediately before the return that follows successful JSON unmarshal): + +```go +for _, w := range cfg.Webhooks { + if err := w.Validate(); err != nil { + log.Printf("config: %v (skipped)", err) + } +} +``` + +Add `"log"` to the imports if not present. + +- [ ] **Step 4: Run tests, verify they pass** + +Run: `go test ./internal/core/ -race -v` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add internal/core/config.go internal/core/config_test.go +git commit -m "feat(core): add WebhookConfig type with validation" +``` + +--- + +## Task 5: webhook.Payload + +**Files:** +- Create: `internal/webhook/payload.go` +- Create: `internal/webhook/payload_test.go` + +- [ ] **Step 1: Write failing test** + +```go +// internal/webhook/payload_test.go +package webhook + +import ( + "encoding/json" + "strings" + "testing" + "time" + + "github.com/illegalstudio/lazyagent/internal/core" +) + +func TestPayload_MarshalContainsExpectedFields(t *testing.T) { + p := Payload{ + ID: "f47ac10b-58cc-4372-a567-0e02b2c3d479", + Event: "state_transition", + SessionID: "abc", + Agent: "claude", + From: string(core.ActivityIdle), + To: string(core.ActivityWaiting), + ProjectPath: "/p", + Timestamp: time.Date(2026, 5, 19, 14, 30, 0, 0, time.UTC), + API: &APILinks{ + SessionURL: "http://127.0.0.1:7421/api/sessions/abc", + DetailURL: "http://127.0.0.1:7421/api/sessions/abc/full", + }, + } + b, err := json.Marshal(p) + if err != nil { + t.Fatalf("marshal: %v", err) + } + s := string(b) + for _, want := range []string{ + `"id":"f47ac10b`, `"event":"state_transition"`, `"session_id":"abc"`, + `"agent":"claude"`, `"from":"idle"`, `"to":"waiting"`, + `"project_path":"/p"`, `"timestamp":"2026-05-19T14:30:00Z"`, + `"api":{`, `"session_url":"http://127.0.0.1:7421/api/sessions/abc"`, + } { + if !strings.Contains(s, want) { + t.Errorf("missing %q in %s", want, s) + } + } +} + +func TestPayload_MarshalOmitsAPIWhenNil(t *testing.T) { + p := Payload{ID: "x", Event: "state_transition", SessionID: "s"} + b, _ := json.Marshal(p) + if strings.Contains(string(b), `"api"`) { + t.Fatalf("api field should be omitted: %s", b) + } +} +``` + +- [ ] **Step 2: Run test, verify it fails** + +Run: `go test ./internal/webhook/ -v` +Expected: compile error. + +- [ ] **Step 3: Implement Payload** + +```go +// internal/webhook/payload.go +package webhook + +import "time" + +// Payload is the JSON body sent on every webhook delivery. +type Payload struct { + ID string `json:"id"` + Event string `json:"event"` + SessionID string `json:"session_id"` + Agent string `json:"agent"` + From string `json:"from"` + To string `json:"to"` + ProjectPath string `json:"project_path"` + Timestamp time.Time `json:"timestamp"` + API *APILinks `json:"api,omitempty"` +} + +// APILinks point back to the local lazyagent API server for full details. +// Present only when the API server is running. +type APILinks struct { + SessionURL string `json:"session_url"` + DetailURL string `json:"detail_url"` +} +``` + +- [ ] **Step 4: Run tests, verify they pass** + +Run: `go test ./internal/webhook/ -v` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add internal/webhook/payload.go internal/webhook/payload_test.go +git commit -m "feat(webhook): payload schema with optional API links" +``` + +--- + +## Task 6: webhook.Filter + +**Files:** +- Create: `internal/webhook/filter.go` +- Create: `internal/webhook/filter_test.go` + +- [ ] **Step 1: Write failing test** + +```go +// internal/webhook/filter_test.go +package webhook + +import ( + "testing" + + "github.com/illegalstudio/lazyagent/internal/core" +) + +func TestMatches(t *testing.T) { + ev := core.SessionEvent{Agent: "claude", To: core.ActivityWaiting} + + cases := []struct { + name string + w core.WebhookConfig + matches bool + }{ + {"empty filters match all", core.WebhookConfig{Name: "x", URL: "https://x"}, true}, + {"matching event", core.WebhookConfig{Name: "x", URL: "https://x", Events: []string{"waiting"}}, true}, + {"non-matching event", core.WebhookConfig{Name: "x", URL: "https://x", Events: []string{"thinking"}}, false}, + {"matching agent", core.WebhookConfig{Name: "x", URL: "https://x", Agents: []string{"claude"}}, true}, + {"non-matching agent", core.WebhookConfig{Name: "x", URL: "https://x", Agents: []string{"codex"}}, false}, + {"event AND agent both match", core.WebhookConfig{Name: "x", URL: "https://x", Events: []string{"waiting"}, Agents: []string{"claude"}}, true}, + {"event matches, agent doesn't", core.WebhookConfig{Name: "x", URL: "https://x", Events: []string{"waiting"}, Agents: []string{"codex"}}, false}, + {"case-insensitive event", core.WebhookConfig{Name: "x", URL: "https://x", Events: []string{"WAITING"}}, true}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + if got := Matches(c.w, ev); got != c.matches { + t.Fatalf("got %v, want %v", got, c.matches) + } + }) + } +} +``` + +- [ ] **Step 2: Run test, verify it fails** + +Run: `go test ./internal/webhook/ -run TestMatches -v` +Expected: compile error. + +- [ ] **Step 3: Implement Matches** + +```go +// internal/webhook/filter.go +package webhook + +import ( + "strings" + + "github.com/illegalstudio/lazyagent/internal/core" +) + +// Matches returns true when the event passes the webhook's event/agent filters. +// Empty filter slices match everything. +func Matches(w core.WebhookConfig, ev core.SessionEvent) bool { + if len(w.Events) > 0 { + want := strings.ToLower(string(ev.To)) + ok := false + for _, e := range w.Events { + if strings.ToLower(e) == want { + ok = true + break + } + } + if !ok { + return false + } + } + if len(w.Agents) > 0 { + want := strings.ToLower(ev.Agent) + ok := false + for _, a := range w.Agents { + if strings.ToLower(a) == want { + ok = true + break + } + } + if !ok { + return false + } + } + return true +} +``` + +- [ ] **Step 4: Run tests, verify they pass** + +Run: `go test ./internal/webhook/ -v` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add internal/webhook/filter.go internal/webhook/filter_test.go +git commit -m "feat(webhook): event + agent filter matching" +``` + +--- + +## Task 7: webhook.Sign (HMAC-SHA256) + +**Files:** +- Create: `internal/webhook/hmac.go` +- Create: `internal/webhook/hmac_test.go` + +- [ ] **Step 1: Write failing test with known vector** + +```go +// internal/webhook/hmac_test.go +package webhook + +import "testing" + +func TestSign_KnownVector(t *testing.T) { + // HMAC-SHA256("it's a secret", `{"foo":"bar"}`) hex digest. + // Verified independently: echo -n '{"foo":"bar"}' | openssl dgst -sha256 -hmac "it's a secret" + const want = "sha256=5d1eaa4e0d72b46cef0ecbf3a8ab06d7c3e0c89c0c4d4f10907ba87baa11d97a" + got := Sign("it's a secret", []byte(`{"foo":"bar"}`)) + if got != want { + t.Fatalf("got %q, want %q", got, want) + } +} + +func TestSign_EmptySecret(t *testing.T) { + if Sign("", []byte("x")) == "" { + t.Fatal("Sign with empty secret should still return a valid signature string") + } +} +``` + +> Note for implementer: if the test vector above doesn't match, regenerate it +> with `echo -n '{"foo":"bar"}' | openssl dgst -sha256 -hmac "it's a secret"` +> and update the constant. The point of the test is that the format is fixed. + +- [ ] **Step 2: Run test, verify it fails** + +Run: `go test ./internal/webhook/ -run TestSign -v` +Expected: compile error — `Sign` undefined. + +- [ ] **Step 3: Implement Sign** + +```go +// internal/webhook/hmac.go +package webhook + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/hex" +) + +// Sign returns the HMAC-SHA256 of body keyed by secret, formatted as +// "sha256=" — the same convention used by GitHub webhooks. +func Sign(secret string, body []byte) string { + mac := hmac.New(sha256.New, []byte(secret)) + mac.Write(body) + return "sha256=" + hex.EncodeToString(mac.Sum(nil)) +} +``` + +- [ ] **Step 4: Run tests, verify they pass** + +Run: `go test ./internal/webhook/ -v` +Expected: PASS. If `TestSign_KnownVector` fails because the test vector is wrong, regenerate as noted and re-run. + +- [ ] **Step 5: Commit** + +```bash +git add internal/webhook/hmac.go internal/webhook/hmac_test.go +git commit -m "feat(webhook): HMAC-SHA256 signing of payloads" +``` + +--- + +## Task 8: Dispatcher happy path (POST 2xx) + +**Files:** +- Create: `internal/webhook/dispatcher.go` +- Create: `internal/webhook/dispatcher_test.go` + +- [ ] **Step 1: Write failing test** + +```go +// internal/webhook/dispatcher_test.go +package webhook + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/illegalstudio/lazyagent/internal/core" +) + +type stubCfg struct{ webhooks []core.WebhookConfig } + +func (s stubCfg) Webhooks() []core.WebhookConfig { return s.webhooks } + +func TestDispatcher_HappyPath(t *testing.T) { + var mu sync.Mutex + var bodies []map[string]any + var headers []http.Header + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b, _ := io.ReadAll(r.Body) + var p map[string]any + _ = json.Unmarshal(b, &p) + mu.Lock() + bodies = append(bodies, p) + headers = append(headers, r.Header.Clone()) + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: 2 * time.Second}, func() string { return "" }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + // Give Start a moment to subscribe to the bus. + time.Sleep(20 * time.Millisecond) + + bus.Publish(core.SessionEvent{SessionID: "s1", Agent: "claude", From: core.ActivityIdle, To: core.ActivityWaiting, ProjectPath: "/p", At: time.Now()}) + + deadline := time.Now().Add(2 * time.Second) + for { + mu.Lock() + n := len(bodies) + mu.Unlock() + if n >= 1 || time.Now().After(deadline) { + break + } + time.Sleep(20 * time.Millisecond) + } + + mu.Lock() + defer mu.Unlock() + if len(bodies) != 1 { + t.Fatalf("got %d POSTs, want 1", len(bodies)) + } + if bodies[0]["session_id"] != "s1" || bodies[0]["to"] != "waiting" { + t.Fatalf("unexpected body: %+v", bodies[0]) + } + if h := headers[0].Get("X-Lazyagent-Event"); h != "state_transition" { + t.Errorf("X-Lazyagent-Event = %q", h) + } + if h := headers[0].Get("X-Lazyagent-Delivery"); h == "" { + t.Error("X-Lazyagent-Delivery missing") + } + if h := headers[0].Get("Content-Type"); h != "application/json" { + t.Errorf("Content-Type = %q", h) + } +} +``` + +- [ ] **Step 2: Run test, verify it fails** + +Run: `go test ./internal/webhook/ -run TestDispatcher_HappyPath -v` +Expected: compile error — `New`, `Dispatcher.Start` undefined. + +- [ ] **Step 3: Implement Dispatcher skeleton + happy-path delivery** + +```go +// internal/webhook/dispatcher.go +package webhook + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "time" + + "github.com/illegalstudio/lazyagent/internal/core" + "github.com/illegalstudio/lazyagent/internal/version" +) + +// ConfigSource provides the current set of webhook configurations. +// Implementations may return a different slice on each call (e.g. after +// config reload); the dispatcher reads it once per incoming event. +type ConfigSource interface { + Webhooks() []core.WebhookConfig +} + +// Dispatcher consumes SessionEvents from a bus and delivers them as HTTP +// POSTs to configured webhooks. +type Dispatcher struct { + bus *core.EventBus + cfg ConfigSource + client *http.Client + apiAddr func() string + + queueSize int + workers int + backoffs []time.Duration +} + +// deliveryJob is one POST attempt against a specific webhook. +type deliveryJob struct { + webhook core.WebhookConfig + body []byte + deliveryID string +} + +// New creates a Dispatcher. The HTTP client should have a sensible timeout +// set (e.g. 10s). apiAddr returns the API server base URL (e.g. +// "http://127.0.0.1:7421") or "" if no API server is running. +func New(bus *core.EventBus, cfg ConfigSource, client *http.Client, apiAddr func() string) *Dispatcher { + if client == nil { + client = &http.Client{Timeout: 10 * time.Second} + } + if apiAddr == nil { + apiAddr = func() string { return "" } + } + return &Dispatcher{ + bus: bus, + cfg: cfg, + client: client, + apiAddr: apiAddr, + queueSize: 256, + workers: 4, + backoffs: []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}, + } +} + +// Start subscribes to the bus and runs the fan-out + worker goroutines until +// ctx is cancelled. Returns the context error on exit. +func (d *Dispatcher) Start(ctx context.Context) error { + events := d.bus.Subscribe(256) + defer d.bus.Unsubscribe(events) + + queue := make(chan deliveryJob, d.queueSize) + + // Worker pool + workerDone := make(chan struct{}, d.workers) + for i := 0; i < d.workers; i++ { + go func() { + defer func() { workerDone <- struct{}{} }() + for { + select { + case <-ctx.Done(): + return + case job := <-queue: + d.deliver(ctx, job) + } + } + }() + } + + // Fan-out loop + for { + select { + case <-ctx.Done(): + close(queue) + for i := 0; i < d.workers; i++ { + <-workerDone + } + return ctx.Err() + case ev := <-events: + d.fanout(ev, queue) + } + } +} + +// fanout marshals the payload once, walks the configured webhooks, and +// enqueues one deliveryJob per match. Drops jobs when the queue is full. +func (d *Dispatcher) fanout(ev core.SessionEvent, queue chan<- deliveryJob) { + webhooks := d.cfg.Webhooks() + if len(webhooks) == 0 { + return + } + deliveryID := newDeliveryID() + payload := Payload{ + ID: deliveryID, + Event: "state_transition", + SessionID: ev.SessionID, + Agent: ev.Agent, + From: string(ev.From), + To: string(ev.To), + ProjectPath: ev.ProjectPath, + Timestamp: ev.At.UTC(), + } + if base := d.apiAddr(); base != "" { + payload.API = &APILinks{ + SessionURL: fmt.Sprintf("%s/api/sessions/%s", base, ev.SessionID), + DetailURL: fmt.Sprintf("%s/api/sessions/%s/full", base, ev.SessionID), + } + } + body, err := json.Marshal(payload) + if err != nil { + log.Printf("webhook: marshal payload: %v", err) + return + } + for _, w := range webhooks { + if !w.IsEnabled() || !Matches(w, ev) { + continue + } + select { + case queue <- deliveryJob{webhook: w, body: body, deliveryID: deliveryID}: + default: + log.Printf("webhook: queue full, dropping delivery for %q", w.Name) + } + } +} + +// deliver performs the POST with no retry (retry added in Task 9). +func (d *Dispatcher) deliver(ctx context.Context, job deliveryJob) { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, job.webhook.URL, bytes.NewReader(job.body)) + if err != nil { + log.Printf("webhook %q: build request: %v", job.webhook.Name, err) + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "lazyagent/"+version.String()) + req.Header.Set("X-Lazyagent-Event", "state_transition") + req.Header.Set("X-Lazyagent-Delivery", job.deliveryID) + if job.webhook.Secret != "" { + req.Header.Set("X-Lazyagent-Signature", Sign(job.webhook.Secret, job.body)) + } + resp, err := d.client.Do(req) + if err != nil { + log.Printf("webhook %q: POST: %v", job.webhook.Name, err) + return + } + defer resp.Body.Close() + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return + } + snippet, _ := io.ReadAll(io.LimitReader(resp.Body, 200)) + log.Printf("webhook %q: %d %s — %s", job.webhook.Name, resp.StatusCode, resp.Status, string(snippet)) +} + +func newDeliveryID() string { + var b [16]byte + _, _ = rand.Read(b[:]) + // Format as RFC 4122-ish UUIDv4 string. + b[6] = (b[6] & 0x0f) | 0x40 + b[8] = (b[8] & 0x3f) | 0x80 + return fmt.Sprintf("%s-%s-%s-%s-%s", + hex.EncodeToString(b[0:4]), + hex.EncodeToString(b[4:6]), + hex.EncodeToString(b[6:8]), + hex.EncodeToString(b[8:10]), + hex.EncodeToString(b[10:16]), + ) +} +``` + +- [ ] **Step 4: Run tests, verify they pass** + +Run: `go test ./internal/webhook/ -race -v` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add internal/webhook/dispatcher.go internal/webhook/dispatcher_test.go +git commit -m "feat(webhook): dispatcher with fan-out and HTTP POST delivery" +``` + +--- + +## Task 9: Dispatcher retry on 5xx, no retry on 4xx + +**Files:** +- Modify: `internal/webhook/dispatcher.go` +- Modify: `internal/webhook/dispatcher_test.go` + +- [ ] **Step 1: Write failing tests** + +Append to `internal/webhook/dispatcher_test.go`: + +```go +import "sync/atomic" + +func TestDispatcher_Retry500Then200(t *testing.T) { + var attempts int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if atomic.AddInt32(&attempts, 1) == 1 { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + d.backoffs = []time.Duration{10 * time.Millisecond, 10 * time.Millisecond, 10 * time.Millisecond} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + time.Sleep(20 * time.Millisecond) + + bus.Publish(core.SessionEvent{SessionID: "s1", To: core.ActivityWaiting, At: time.Now()}) + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if atomic.LoadInt32(&attempts) >= 2 { + break + } + time.Sleep(20 * time.Millisecond) + } + if a := atomic.LoadInt32(&attempts); a != 2 { + t.Fatalf("got %d attempts, want 2", a) + } +} + +func TestDispatcher_NoRetryOn400(t *testing.T) { + var attempts int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusBadRequest) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + d.backoffs = []time.Duration{10 * time.Millisecond, 10 * time.Millisecond, 10 * time.Millisecond} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + time.Sleep(20 * time.Millisecond) + + bus.Publish(core.SessionEvent{SessionID: "s1", To: core.ActivityWaiting, At: time.Now()}) + time.Sleep(200 * time.Millisecond) // wait long enough for any retries to fire + + if a := atomic.LoadInt32(&attempts); a != 1 { + t.Fatalf("got %d attempts, want 1 (no retry on 4xx)", a) + } +} + +func TestDispatcher_AllAttemptsFail(t *testing.T) { + var attempts int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + d.backoffs = []time.Duration{10 * time.Millisecond, 10 * time.Millisecond, 10 * time.Millisecond} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + time.Sleep(20 * time.Millisecond) + + bus.Publish(core.SessionEvent{SessionID: "s1", To: core.ActivityWaiting, At: time.Now()}) + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if atomic.LoadInt32(&attempts) >= 4 { + break + } + time.Sleep(20 * time.Millisecond) + } + if a := atomic.LoadInt32(&attempts); a != 4 { + t.Fatalf("got %d attempts, want 4 (initial + 3 retries)", a) + } +} +``` + +- [ ] **Step 2: Run tests, verify they fail** + +Run: `go test ./internal/webhook/ -run "TestDispatcher_Retry|TestDispatcher_NoRetry|TestDispatcher_AllAttempts" -v` +Expected: FAIL — only one attempt is made. + +- [ ] **Step 3: Add retry loop in deliver** + +Replace the body of `deliver` in `internal/webhook/dispatcher.go`: + +```go +// deliver performs the POST, retrying on transient failures. +// 4xx is treated as permanent. 5xx, network errors, and timeouts retry +// with backoff up to len(d.backoffs) times (total attempts = 1 + retries). +func (d *Dispatcher) deliver(ctx context.Context, job deliveryJob) { + for attempt := 0; attempt <= len(d.backoffs); attempt++ { + if attempt > 0 { + select { + case <-ctx.Done(): + return + case <-time.After(d.backoffs[attempt-1]): + } + } + status, transient, err := d.doOnce(ctx, job) + if err == nil && !transient { + return + } + if err == nil && status >= 400 && status < 500 { + return // permanent + } + if attempt == len(d.backoffs) { + log.Printf("webhook %q: giving up after %d attempts", job.webhook.Name, attempt+1) + } + } +} + +// doOnce performs a single POST. Returns (status, transient, err). +// - 2xx: status=2xx, transient=false, err=nil → success +// - 4xx: status=4xx, transient=false, err=nil → permanent +// - 5xx: status=5xx, transient=true, err=nil → retry +// - network/timeout: status=0, transient=true, err=non-nil → retry +func (d *Dispatcher) doOnce(ctx context.Context, job deliveryJob) (int, bool, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, job.webhook.URL, bytes.NewReader(job.body)) + if err != nil { + return 0, false, err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "lazyagent/"+version.String()) + req.Header.Set("X-Lazyagent-Event", "state_transition") + req.Header.Set("X-Lazyagent-Delivery", job.deliveryID) + if job.webhook.Secret != "" { + req.Header.Set("X-Lazyagent-Signature", Sign(job.webhook.Secret, job.body)) + } + resp, err := d.client.Do(req) + if err != nil { + return 0, true, err + } + defer resp.Body.Close() + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return resp.StatusCode, false, nil + } + snippet, _ := io.ReadAll(io.LimitReader(resp.Body, 200)) + log.Printf("webhook %q: %d %s — %s", job.webhook.Name, resp.StatusCode, resp.Status, string(snippet)) + if resp.StatusCode >= 500 { + return resp.StatusCode, true, nil + } + return resp.StatusCode, false, nil +} +``` + +- [ ] **Step 4: Run all webhook tests, verify they pass** + +Run: `go test ./internal/webhook/ -race -v` +Expected: all PASS. + +- [ ] **Step 5: Commit** + +```bash +git add internal/webhook/dispatcher.go internal/webhook/dispatcher_test.go +git commit -m "feat(webhook): retry transient failures, no retry on 4xx" +``` + +--- + +## Task 10: Dispatcher HMAC signature header (integration test) + +**Files:** +- Modify: `internal/webhook/dispatcher_test.go` + +The Sign function is already used in `doOnce` when a secret is configured. This task adds an integration test that verifies the header is set correctly end-to-end (covers wire format, not just the Sign function). + +- [ ] **Step 1: Write failing test** + +Append to `internal/webhook/dispatcher_test.go`: + +```go +func TestDispatcher_HMACHeaderWhenSecretSet(t *testing.T) { + var sigHeader string + var body []byte + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sigHeader = r.Header.Get("X-Lazyagent-Signature") + body, _ = io.ReadAll(r.Body) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL, Secret: "hello"}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + time.Sleep(20 * time.Millisecond) + + bus.Publish(core.SessionEvent{SessionID: "s1", To: core.ActivityWaiting, At: time.Now()}) + time.Sleep(200 * time.Millisecond) + + if sigHeader == "" { + t.Fatal("X-Lazyagent-Signature missing") + } + if want := Sign("hello", body); sigHeader != want { + t.Fatalf("got %q, want %q", sigHeader, want) + } +} + +func TestDispatcher_NoHMACWhenSecretEmpty(t *testing.T) { + var sigHeader string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sigHeader = r.Header.Get("X-Lazyagent-Signature") + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + time.Sleep(20 * time.Millisecond) + + bus.Publish(core.SessionEvent{SessionID: "s1", To: core.ActivityWaiting, At: time.Now()}) + time.Sleep(200 * time.Millisecond) + + if sigHeader != "" { + t.Fatalf("X-Lazyagent-Signature should be absent, got %q", sigHeader) + } +} +``` + +- [ ] **Step 2: Run tests, verify they pass** + +Run: `go test ./internal/webhook/ -run TestDispatcher_HMAC -race -v` +Expected: PASS (HMAC logic is already implemented in Task 8 + 9). + +- [ ] **Step 3: Commit** + +```bash +git add internal/webhook/dispatcher_test.go +git commit -m "test(webhook): cover HMAC header wire format end-to-end" +``` + +--- + +## Task 11: Dispatcher dedupe duplicate transitions + +When TUI, GUI, and API run in the same process, each builds its own `SessionManager` and `ActivityTracker`. All three publish the same `(session, from, to)` transition. This task adds a small last-seen map to the dispatcher to coalesce duplicates emitted within a short window. + +**Files:** +- Modify: `internal/webhook/dispatcher.go` +- Modify: `internal/webhook/dispatcher_test.go` + +- [ ] **Step 1: Write failing test** + +Append: + +```go +func TestDispatcher_DedupesSameTransitionWithinWindow(t *testing.T) { + var attempts int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + time.Sleep(20 * time.Millisecond) + + now := time.Now() + ev := core.SessionEvent{SessionID: "s1", From: core.ActivityIdle, To: core.ActivityWaiting, At: now} + bus.Publish(ev) + bus.Publish(ev) // duplicate from a second manager + bus.Publish(ev) // duplicate from a third + + time.Sleep(300 * time.Millisecond) + + if a := atomic.LoadInt32(&attempts); a != 1 { + t.Fatalf("got %d POSTs, want 1 (dedup)", a) + } +} + +func TestDispatcher_DistinctTransitionsNotDeduped(t *testing.T) { + var attempts int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + time.Sleep(20 * time.Millisecond) + + now := time.Now() + bus.Publish(core.SessionEvent{SessionID: "s1", From: core.ActivityIdle, To: core.ActivityWaiting, At: now}) + bus.Publish(core.SessionEvent{SessionID: "s1", From: core.ActivityWaiting, To: core.ActivityThinking, At: now}) + + time.Sleep(300 * time.Millisecond) + + if a := atomic.LoadInt32(&attempts); a != 2 { + t.Fatalf("got %d POSTs, want 2 (distinct transitions)", a) + } +} +``` + +- [ ] **Step 2: Run tests, verify they fail** + +Run: `go test ./internal/webhook/ -run TestDispatcher_Dedup -race -v` +Expected: FAIL — duplicates produce 3 POSTs. + +- [ ] **Step 3: Add dedup in fanout** + +In `dispatcher.go`, add fields to the struct and a helper: + +```go +type Dispatcher struct { + bus *core.EventBus + cfg ConfigSource + client *http.Client + apiAddr func() string + + queueSize int + workers int + backoffs []time.Duration + + dedupWindow time.Duration + + mu sync.Mutex + lastSeen map[string]lastSeenEntry // key: session_id +} + +type lastSeenEntry struct { + from core.ActivityKind + to core.ActivityKind + at time.Time +} +``` + +In `New`: + +```go +return &Dispatcher{ + bus: bus, + cfg: cfg, + client: client, + apiAddr: apiAddr, + queueSize: 256, + workers: 4, + backoffs: []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}, + dedupWindow: 2 * time.Second, + lastSeen: make(map[string]lastSeenEntry), +} +``` + +At the top of `fanout`, add a dedup check: + +```go +func (d *Dispatcher) fanout(ev core.SessionEvent, queue chan<- deliveryJob) { + if d.shouldDedup(ev) { + return + } + // ... existing logic +} + +func (d *Dispatcher) shouldDedup(ev core.SessionEvent) bool { + d.mu.Lock() + defer d.mu.Unlock() + prev, ok := d.lastSeen[ev.SessionID] + now := time.Now() + if ok && prev.from == ev.From && prev.to == ev.To && now.Sub(prev.at) < d.dedupWindow { + return true + } + d.lastSeen[ev.SessionID] = lastSeenEntry{from: ev.From, to: ev.To, at: now} + return false +} +``` + +Add `"sync"` to the import list of `dispatcher.go`. + +- [ ] **Step 4: Run tests, verify they pass** + +Run: `go test ./internal/webhook/ -race -v` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add internal/webhook/dispatcher.go internal/webhook/dispatcher_test.go +git commit -m "feat(webhook): dedup duplicate transitions across managers" +``` + +--- + +## Task 12: Dispatcher graceful shutdown drains workers + +**Files:** +- Modify: `internal/webhook/dispatcher_test.go` + +The fan-out + worker loop already respects `ctx.Done()`. This task adds a test that exercises the shutdown path under load to guard against future regressions. + +- [ ] **Step 1: Write test** + +Append: + +```go +func TestDispatcher_ContextCancelStopsCleanly(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + _ = d.Start(ctx) + close(done) + }() + time.Sleep(20 * time.Millisecond) + + for i := 0; i < 50; i++ { + bus.Publish(core.SessionEvent{SessionID: fmt.Sprintf("s%d", i), To: core.ActivityWaiting, At: time.Now()}) + } + cancel() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("dispatcher did not stop after context cancel") + } +} +``` + +Add `"fmt"` to the test file imports if missing. + +- [ ] **Step 2: Run, verify pass** + +Run: `go test ./internal/webhook/ -run TestDispatcher_ContextCancel -race -v` +Expected: PASS. + +- [ ] **Step 3: Commit** + +```bash +git add internal/webhook/dispatcher_test.go +git commit -m "test(webhook): cover graceful shutdown under load" +``` + +--- + +## Task 13: Wire bus into TUI manager + +**Files:** +- Modify: `internal/ui/app.go` + +- [ ] **Step 1: Add bus parameter to NewModel** + +In `internal/ui/app.go:115`, change: + +```go +func NewModel(provider core.SessionProvider) Model { +``` + +to: + +```go +func NewModel(provider core.SessionProvider, bus *core.EventBus) Model { +``` + +After `mgr := core.NewSessionManager(cfg.WindowMinutes, provider)` (line 118), insert: + +```go +if bus != nil { + mgr.SetEventBus(bus) +} +``` + +Update callers: `main.go:210` (`ui.NewModel(provider)` → `ui.NewModel(provider, nil)` — `main.go` will pass the real bus in Task 16) and `internal/ui/app_test.go:31` (already uses a local construction pattern; pass `nil`). + +- [ ] **Step 2: Build, verify it compiles** + +Run: `go build ./...` +Expected: success. + +- [ ] **Step 3: Run all tests** + +Run: `go test ./... -race` +Expected: PASS. + +- [ ] **Step 4: Commit** + +```bash +git add internal/ui/app.go +git commit -m "feat(ui): accept optional EventBus for transition publishing" +``` + +--- + +## Task 14: Wire bus into API manager + +**Files:** +- Modify: `internal/api/server.go` + +- [ ] **Step 1: Add bus param to api.New** + +In `internal/api/server.go`, change the signature of `New` to accept a `*core.EventBus` (place after the existing params). Inside `New`, after `manager.SetExcludeCWDSubstrings(...)`: + +```go +if bus != nil { + manager.SetEventBus(bus) +} +``` + +Update all callers (`main.go`, plus any tests in `internal/api/`). Pass `nil` in tests to preserve current behavior. + +- [ ] **Step 2: Build, verify it compiles** + +Run: `go build ./...` +Expected: success. + +- [ ] **Step 3: Run tests** + +Run: `go test ./internal/api/ -race -v` +Expected: PASS. + +- [ ] **Step 4: Commit** + +```bash +git add internal/api/server.go +git commit -m "feat(api): accept optional EventBus for transition publishing" +``` + +--- + +## Task 15: Wire bus into GUI tray service + +**Files:** +- Modify: `internal/tray/service.go` + +- [ ] **Step 1: Inspect existing tray service.go signature** + +Read the file to find where `core.NewSessionManager` is called (around line 50). The tray runs in a separate detached process and uses build tag `!notray`. The bus passed by the parent process is not accessible — tray needs its own bus inside its process if it wants to emit webhooks. + +For MVP simplicity: the tray process will construct its own bus and dispatcher when its config has webhooks. This is the same code as the main-process path but happens in the tray process. + +Add a `bus *core.EventBus` field to the tray's manager wiring. Inside the tray's startup (likely a `Run` function), after constructing the manager: + +```go +cfg := core.LoadConfig() +if len(cfg.ValidWebhooks()) > 0 { + bus := core.NewEventBus() + s.manager.SetEventBus(bus) + httpClient := &http.Client{Timeout: 10 * time.Second} + d := webhook.New(bus, &cfgSource{cfg: cfg}, httpClient, func() string { return "" }) + go func() { _ = d.Start(ctx) }() +} +``` + +where `cfgSource` is a small wrapper implementing `webhook.ConfigSource`: + +```go +type cfgSource struct{ cfg core.Config } +func (c *cfgSource) Webhooks() []core.WebhookConfig { return c.cfg.ValidWebhooks() } +``` + +Place `cfgSource` in `internal/webhook/configsource.go` so both `main.go` and `internal/tray/service.go` can use it. (Alternative: keep it private to each caller; pick the simpler option in the implementation.) + +The tray must guard the import of `internal/webhook` so the `notray` build still compiles — but since this code only exists in `service.go` which is already `//go:build !notray`-only, it is naturally guarded. + +- [ ] **Step 2: Build with and without notray** + +Run: `go build ./...` and `go build -tags notray ./...` +Expected: both succeed. + +- [ ] **Step 3: Commit** + +```bash +git add internal/tray/service.go internal/webhook/configsource.go +git commit -m "feat(tray): start webhook dispatcher when configured" +``` + +--- + +## Task 16: Main process wiring + +**Files:** +- Modify: `main.go` + +- [ ] **Step 1: Construct bus and start dispatcher in main** + +In `main.go`, after `cfg := core.LoadConfig()` and after the provider is built, add: + +```go +// EventBus + webhook dispatcher (started when at least one valid webhook is configured). +var eventBus *core.EventBus +var dispatcherStop context.CancelFunc +if len(cfg.ValidWebhooks()) > 0 { + eventBus = core.NewEventBus() + dispatcherCtx, cancel := context.WithCancel(context.Background()) + dispatcherStop = cancel + httpClient := &http.Client{Timeout: 10 * time.Second} + apiAddr := func() string { + // Returns "http://" or "" if not running. Populated below + // once the API server is started, via an atomic.Value or a small + // helper. For MVP, capture the resolved bind address after srv.Run + // begins, or pass a static empty string until the API server + // exposes an Addr() method. + return "" + } + d := webhook.New(eventBus, &cfgSource{cfg: cfg}, httpClient, apiAddr) + go func() { _ = d.Start(dispatcherCtx) }() +} +``` + +Where `cfgSource` is the type added in Task 15. Add the imports `"net/http"`, `"github.com/illegalstudio/lazyagent/internal/webhook"`, and `"context"` if missing. + +Then pass `eventBus` into: + +- The TUI entry point (where it constructs its manager). +- `api.New(...)` (added in Task 14). +- The tray fork (tray runs in its own process and constructs its own bus — no change here). + +On shutdown, before `defer cancel()` returns, call `dispatcherStop()` if non-nil. + +A clean refactor: extract a small helper `setupWebhooks(cfg)` that returns `(*core.EventBus, func(), apiAddrSetter)`. Decision left to implementation taste. + +For the `apiAddr` capture: when the API server is constructed, store its bind address in an `atomic.Value` (or similar) and have `apiAddr` read from it. If the API is never started, the value stays empty and the `api` object is omitted from payloads — which is the desired behavior. + +- [ ] **Step 2: Build and run smoke test** + +```bash +make tui +./build/lazyagent --help +``` + +Expected: builds cleanly, help output unchanged. + +- [ ] **Step 3: Configure a webhook and verify end-to-end with a local test server** + +```bash +# In one terminal: +go run ./testdata/webhook-sink # if no such tool exists, use python -m http.server or netcat +# Or: +while true; do echo -e "HTTP/1.1 200 OK\n\n" | nc -l 9999; done +``` + +Edit `~/.config/lazyagent/config.json`: + +```json +{ + "webhooks": [ + {"name": "local", "url": "http://127.0.0.1:9999"} + ] +} +``` + +Run `lazyagent --tui` against real Claude sessions (or `--demo`), trigger a state change in any session (e.g. complete a task or send a message in Claude). Verify a POST hits the local server with the expected body. + +If using `--demo`, the demo provider emits synthetic data — verify the dispatcher fires on the synthetic state changes. + +- [ ] **Step 4: Commit** + +```bash +git add main.go +git commit -m "feat: start webhook dispatcher in main when webhooks configured" +``` + +--- + +## Task 17: User-facing documentation + +**Files:** +- Create: `docs/reference/webhooks.md` +- Modify: `docs/reference/configuration.md` +- Modify: `docs/reference/roadmap.md` +- Modify: `README.md` + +- [ ] **Step 1: Write the dedicated webhooks page** + +Create `docs/reference/webhooks.md`. Follow the Astro Starlight frontmatter style used by the rest of `docs/reference/` (see `docs/reference/configuration.md` for the exact format). + +Cover: +- Why webhooks (use cases: Slack notifications, dashboards, CI triggers) +- Configuration example (full JSON, copy-pasteable) +- Payload schema (table of fields, sample body) +- Headers table +- HMAC verification with a 10-line Python snippet: + + ```python + import hmac, hashlib + secret = b"abc123sharedwithslack" + body = request.get_data() + sig = "sha256=" + hmac.new(secret, body, hashlib.sha256).hexdigest() + if not hmac.compare_digest(sig, request.headers["X-Lazyagent-Signature"]): + abort(401) + ``` + +- Delivery semantics (async best-effort, retry, drops, dedup window) +- Troubleshooting: + - "I see no POSTs" → check `webhooks: []` length, check `lazyagent` logs + - "I see duplicate POSTs" → mention the 2 s dedup window + - "4xx in logs" → consumer is rejecting; not retried by design + +- [ ] **Step 2: Update configuration.md** + +In `docs/reference/configuration.md`, add a section documenting the new `webhooks` field. Reference `webhooks.md` for full details. + +- [ ] **Step 3: Update roadmap** + +In `docs/reference/roadmap.md`: +- Remove "Outbound webhooks on status changes" from "Future ideas". +- Add a new `v0.10` section listing the shipped capabilities (event bus, dispatcher, HMAC, dedup). + +- [ ] **Step 4: Update README** + +In `README.md`, under the features list (find the existing one-liner style and match), add: + +``` +- Outbound webhooks on session state transitions (Slack, dashboards, CI) +``` + +- [ ] **Step 5: Commit** + +```bash +git add docs/reference/webhooks.md docs/reference/configuration.md docs/reference/roadmap.md README.md +git commit -m "docs: document outbound webhooks" +``` + +--- + +## Final Verification + +- [ ] Run the full test suite with race detector: + ```bash + go test ./... -race + ``` + Expected: all PASS. + +- [ ] Build all targets: + ```bash + go build ./... + go build -tags notray ./... + ``` + Expected: both succeed. + +- [ ] Run `gofmt -l .` and `go vet ./...` — both must report no issues. + +- [ ] Manual end-to-end test with a real webhook receiver (or `httpbin.org/anything` for a low-stakes check) as described in Task 16, Step 3. + +- [ ] Open a PR against `illegalstudio/lazyagent:main` from a feature branch `feature/outbound-webhooks`. Reference the spec in the PR body. Include a sample config snippet and the HMAC verification example. + +--- + +## Notes for the Implementer + +- **TDD strictness:** Every task uses red→green TDD. If you find a step where you "know" the implementation and the test feels like a formality, write the test anyway — it documents intent and catches future regressions. +- **No retroactive features:** If you find yourself wanting to add structured logging, metrics, persistence, or a CLI subcommand, stop and confirm scope. The spec explicitly excludes them. +- **Two-process reality:** When `--gui` is used, the tray runs as a forked process. Task 15 handles tray webhooks separately. Cross-process webhooks fire independently; the dedup window only covers in-process duplicates. +- **`apiAddr` is best-effort:** If wiring it neatly takes more than 30 minutes, ship Task 16 with `apiAddr: func() string { return "" }` and add the proper hookup as a follow-up commit. The payload's `api` object is optional by design. diff --git a/docs/superpowers/specs/2026-05-19-outbound-webhooks-design.md b/docs/superpowers/specs/2026-05-19-outbound-webhooks-design.md new file mode 100644 index 0000000..5c14b0f --- /dev/null +++ b/docs/superpowers/specs/2026-05-19-outbound-webhooks-design.md @@ -0,0 +1,429 @@ +# Outbound Webhooks for Session State Transitions + +## Summary + +Add outbound HTTP webhooks that fire when a monitored session changes activity +state (e.g., `Idle → WaitingForUser`). Users configure one or more endpoints in +`~/.config/lazyagent/config.json` with filters on event type and agent. Delivery +is asynchronous, best-effort, with optional HMAC-SHA256 signing. + +This is the first lazyagent feature with internal pub-sub. To deliver it we add +a small typed event bus in `internal/core/`, which the existing SSE handler can +later migrate onto. The webhook dispatcher lives in a new `internal/webhook/` +package and depends only on the bus and `*http.Client` — no file I/O. + +## Goals + +- POST a JSON payload to user-configured URLs on session state transitions. +- Filter per webhook by event type and agent. +- Optional HMAC-SHA256 signing so the consumer can verify the sender. +- Asynchronous delivery with bounded queue, retry on transient failures, and + drop on overflow. The main session loop is never blocked by a slow consumer. +- Zero behavior change when no webhook is configured. + +## Non-goals (MVP) + +- Persisting undelivered events across restarts. Lazyagent is read-only on + disk today and this feature does not change that. +- Webhooks for non-transition events (file changes, cost updates, new + sessions). These can be added later as new bus event types. +- CLI subcommand for managing webhooks. Users edit `config.json` directly, + consistent with the rest of the project. +- Per-project / CWD-based filtering. The MVP filter is `events + agents`. +- Replacing the existing SSE "pulse" used by `/api/events`. The new bus is + additive in this PR; an SSE refactor is a follow-up. + +## Data Source + +Activity state is computed by `internal/core/activity.go` and stored in +`ActivityTracker`. Today `ActivityTracker.Update(sessions, now)` overwrites +its map with newly computed activities and discards the previous value — no +transition is ever observed. + +The webhook system needs `(previous, next)` per session. We change `Update` to +emit an event whenever the previous activity differs from the next. + +## Architecture + +``` +core.SessionManager + └─ ActivityTracker.Update() + └─ compare prev vs next + └─ if changed: bus.Publish(SessionEvent{...}) + │ + ▼ + core.EventBus (new) + │ + ▼ Subscribe(buf=256) + webhook.Dispatcher (new) + ├─ filter (event + agent) + ├─ enqueue deliveryJob + ├─ worker pool (4 goroutines) + │ └─ HTTP POST with retry + HMAC + └─ context-aware shutdown +``` + +Boundaries: + +- `core` does not know about HTTP. It only publishes typed events. +- `webhook` does not know about files, JSONL, or SQLite. It consumes events + and speaks HTTP. +- Both components are independently testable: bus with a synthetic subscriber, + dispatcher with a fake bus and `httptest.Server`. + +## Components + +### 1. `internal/core/eventbus.go` (new) + +A minimal typed pub-sub for in-process subscribers. + +```go +type SessionEvent struct { + SessionID string + Agent string // "claude", "codex", "pi", ... + From ActivityKind + To ActivityKind + At time.Time + ProjectPath string +} + +type EventBus struct { + mu sync.RWMutex + subs []chan SessionEvent +} + +func NewEventBus() *EventBus +func (b *EventBus) Subscribe(buf int) <-chan SessionEvent +func (b *EventBus) Unsubscribe(ch <-chan SessionEvent) +func (b *EventBus) Publish(e SessionEvent) // non-blocking, drops on full subscriber +``` + +Invariants: + +- `Publish` never blocks. If a subscriber channel is full, the event is + dropped for that subscriber and a debug log is emitted (rate-limited). +- `Unsubscribe` is idempotent and safe to call from any goroutine. +- Subscribers receive events in publish order; ordering across subscribers is + not guaranteed. + +### 2. `internal/core/activity.go` change + +`ActivityTracker` gains an optional `*EventBus` reference: + +```go +type ActivityTracker struct { + current map[string]ActivityKind + bus *EventBus // optional, nil-safe +} + +func (t *ActivityTracker) SetEventBus(bus *EventBus) +``` + +`Update(sessions, now)` is changed so that, for each session, after computing +the new activity, it compares against the previous value in `current`. If they +differ and `bus != nil`, it calls `bus.Publish` with a `SessionEvent`. + +Transition cases: + +- Session not seen before: emit `From: ActivityUnknown, To: `. +- Session seen before, activity changed: emit `From: prev, To: next`. +- Session seen before, activity unchanged: no event. +- Session disappears from the input slice: no event in MVP. + +`SessionManager` wires the bus at construction time: + +```go +manager := NewSessionManager(...) +manager.SetEventBus(bus) +``` + +`SetEventBus` propagates the reference into the tracker. + +### 3. `internal/core/config.go` change + +Add a `Webhooks []WebhookConfig` field to `Config`, plus the type: + +```go +type WebhookConfig struct { + Name string `json:"name"` // required, used in logs + URL string `json:"url"` // required, https/http + Secret string `json:"secret,omitempty"` // optional HMAC-SHA256 key + Events []string `json:"events,omitempty"` // empty = all activity kinds + Agents []string `json:"agents,omitempty"` // empty = all agents + Enabled *bool `json:"enabled,omitempty"` // default true; pointer so absence = default +} +``` + +Validation during load: + +- `name` non-empty and unique within the slice. +- `url` parses with `net/url.Parse` and scheme is `http` or `https`. +- Each `events[i]` matches a known `ActivityKind` (case-insensitive, e.g. + `waiting_for_user`, `idle`, `thinking`, `executing_tool`, `processing_result`). +- Each `agents[i]` matches a known agent name (`claude`, `codex`, `pi`, + `cursor`, `amp`, `opencode`). +- Invalid webhooks are skipped with a warning at load time. They do not + prevent other webhooks (or the rest of the config) from loading. This + matches the existing "silent error handling" pattern in the providers. + +### 4. `internal/webhook/` (new package) + +``` +internal/webhook/ + dispatcher.go // Dispatcher type, Start/Stop, fan-out + workers + payload.go // Payload struct, marshal helper + filter.go // matches(WebhookConfig, SessionEvent) bool + hmac.go // sign(secret, body []byte) string + dispatcher_test.go + filter_test.go + hmac_test.go + payload_test.go +``` + +Dispatcher shape: + +```go +type Dispatcher struct { + bus *core.EventBus + cfg ConfigSource // small interface, see below + client *http.Client + apiAddr func() string // optional, returns "" if API server not up + queue chan deliveryJob + workers int +} + +type ConfigSource interface { + Webhooks() []core.WebhookConfig +} + +type deliveryJob struct { + webhook core.WebhookConfig + body []byte // pre-marshaled payload + deliveryID string // uuid v4 + attempt int +} + +func New(bus *core.EventBus, cfg ConfigSource, client *http.Client, apiAddr func() string) *Dispatcher +func (d *Dispatcher) Start(ctx context.Context) error +``` + +Lifecycle: + +1. `Start` subscribes to the bus with buffer 256 and spawns one fan-out + goroutine plus `workers` worker goroutines (default 4). +2. Fan-out reads events from the bus channel. For each event, it walks + `cfg.Webhooks()`, applies `filter.matches` per webhook, marshals the + payload once, then pushes a `deliveryJob` per matching webhook onto the + shared queue. If the queue is full, the job is dropped and a counter is + incremented; the counter is logged once per second at warn level. +3. Workers read jobs from the queue and perform a POST with `client.Do` + (timeout 10s set on the `http.Client`). Result handling: + - 2xx → success. + - 4xx → permanent failure, log at warn level with status code and body + snippet (truncated to 200 bytes), do not retry. + - 5xx, network error, or timeout → retry with backoff `[1s, 5s, 30s]` up + to 3 retries (4 attempts total). On final failure, log warn. +4. `ctx` cancellation drains the queue: fan-out stops accepting new events, + workers finish their current job (respecting the per-request timeout), + then return. + +`ConfigSource` is a small interface so the dispatcher does not depend on the +full `core.Config` type and is trivial to fake in tests. + +`apiAddr` returns the API server bind address (e.g. `http://127.0.0.1:7421`) +when the API mode is active, or `""` otherwise. Used to populate the `api.*` +URLs in the payload. + +### 5. `main.go` wiring + +The dispatcher starts when `webhooks` is non-empty in the config, regardless +of which interface (TUI, GUI, API) is active. It runs in the background as a +goroutine of the main process. If only the GUI is active, the dispatcher +still runs because session monitoring is happening in the parent process. + +```go +bus := core.NewEventBus() +manager.SetEventBus(bus) + +if len(cfg.Webhooks) > 0 { + dispatcher := webhook.New(bus, cfg, httpClient, apiAddrFunc) + go dispatcher.Start(rootCtx) +} +``` + +`rootCtx` is the existing process-lifetime context; cancellation on shutdown +flows naturally into the dispatcher. + +## Payload + +The body of every POST: + +```json +{ + "id": "f47ac10b-58cc-4372-a567-0e02b2c3d479", + "event": "state_transition", + "session_id": "abc123", + "agent": "claude", + "from": "Idle", + "to": "WaitingForUser", + "project_path": "/Users/foo/code/bar", + "timestamp": "2026-05-19T14:30:00Z", + "api": { + "session_url": "http://127.0.0.1:7421/api/sessions/abc123", + "detail_url": "http://127.0.0.1:7421/api/sessions/abc123/full" + } +} +``` + +Field semantics: + +- `id` — UUID v4 generated per delivery. Same `id` is reused across retries + of the same job so consumers can deduplicate. +- `event` — currently always `state_transition`. Future event types (e.g. + `session_created`, `cost_threshold_crossed`) would add new values. +- `from` / `to` — string names of `ActivityKind`. The canonical names match + the values accepted in `config.events`. +- `timestamp` — RFC 3339 UTC. +- `api` — present only if the API server is running and its address is + known. Absent (object omitted entirely) otherwise. Consumers that always + need the URL should run lazyagent with `--api`. + +Outgoing HTTP headers: + +| Header | Value | +| --- | --- | +| `Content-Type` | `application/json` | +| `User-Agent` | `lazyagent/` | +| `X-Lazyagent-Event` | `state_transition` | +| `X-Lazyagent-Delivery` | `` (matches body `id`) | +| `X-Lazyagent-Signature` | `sha256=` (only if `secret` configured) | + +The signature is `hex.EncodeToString(hmac.New(sha256, secret).Sum(body))`, +computed over the exact serialized body bytes. This is the same convention +used by GitHub webhooks; consumers can reuse their existing receivers. + +## Configuration Example + +```json +{ + "agents": ["all"], + "exclude_cwd_substrings": [], + "webhooks": [ + { + "name": "slack-needs-input", + "url": "https://hooks.slack.com/services/T00/B00/XXX", + "secret": "abc123sharedwithslack", + "events": ["waiting_for_user"], + "agents": ["claude", "codex"] + }, + { + "name": "dashboard-everything", + "url": "https://my-dashboard.local/api/lazyagent", + "events": [], + "agents": [] + } + ] +} +``` + +The first webhook only fires for `claude` or `codex` sessions transitioning +to `WaitingForUser`. The second fires for every transition of every agent. + +## Error Handling + +| Situation | Behavior | +| --- | --- | +| Subscriber channel full at `Publish` | Drop event for that subscriber; debug log, rate-limited counter | +| Queue full at fan-out | Drop job; warn log once per second with dropped-since-last-log count | +| HTTP 2xx | Success; debug log | +| HTTP 4xx | No retry; warn log with status + body snippet (≤200 bytes) | +| HTTP 5xx / network / timeout | Retry with backoff `[1s, 5s, 30s]`, max 3 retries (4 attempts); warn on final failure | +| Invalid webhook config | Skipped at load time with warning; other webhooks unaffected | +| Config reload mid-flight | In-flight jobs finish under old config; new events use new config | +| Process shutdown | `ctx` cancellation: fan-out stops, workers drain current job within per-request timeout | + +## Observability + +- Standard log lines via `log` package (consistent with the rest of the + codebase), prefixed `webhook:`. +- Counters for delivered / failed / dropped per webhook name, logged once per + minute at info level if non-zero. Kept in-memory only. +- No metrics endpoint and no Prometheus exposition in MVP; that would + expand scope beyond what upstream review would accept in one PR. + +## Testing + +`internal/core/eventbus_test.go` + +- Publish/Subscribe basic delivery order. +- Drop-on-full: subscriber with `buf=1`, publish 3 events, verify exactly the + first is received and no goroutine is blocked. +- Unsubscribe is idempotent and safe under concurrent publish. +- Race detector clean (`go test -race`). + +`internal/core/activity_test.go` additions + +- Update emits transition event on changed activity, with correct + `From`/`To`/`SessionID`/`Agent`/`ProjectPath`. +- Update emits no event when activity is unchanged. +- New session emits `From: ActivityUnknown`. +- Disappearing session emits nothing in MVP. + +`internal/webhook/filter_test.go` + +- Empty `events` matches any event kind. +- Empty `agents` matches any agent. +- Both specified: AND across the two filters. +- Unknown event kind in config is ignored at filter time (defense in depth + even though load-time validation should prevent this). + +`internal/webhook/hmac_test.go` + +- Known test vector: secret `"it's a secret"`, body `{"foo":"bar"}` → + expected signature is a fixed hex string. This guards against accidental + changes to the signing format. + +`internal/webhook/dispatcher_test.go` + +- POST to `httptest.Server` succeeds, payload body and headers match. +- 500 then 200: exactly one retry, success. +- 500 throughout: 4 attempts total (initial + 3 retries), then give up. +- 400: 1 attempt, no retry. +- Queue full: events beyond capacity are dropped, dropped-counter increments. +- Graceful shutdown: `cancel(ctx)`, current job completes, no panic. + +Integration test: configure a single webhook pointing at a test server, push +an event through a real `EventBus` and `ActivityTracker.Update`, assert the +server received the expected POST. + +## Documentation + +- New page `docs/reference/webhooks.md` (Astro Starlight format matching the + rest of `docs/`) with: motivation, config example, payload schema, signing + reference (including a 10-line Python verification snippet), delivery + semantics, troubleshooting (queue drops, 4xx). +- README: one-line mention under "Features". +- `docs/reference/configuration.md`: document the new `webhooks` field. +- `docs/reference/roadmap.md`: move "Outbound webhooks on status changes" + from "Future ideas" to a new `v0.10` section once shipped. + +## Rollout + +- Default `webhooks: []` (or field absent) → no dispatcher started, no + behavior change. +- Existing SSE behavior unchanged; the new bus is additive. +- No config schema version bump needed (the field is additive and absent in + old configs). + +## Open Questions + +None that block implementation. Two design notes for the implementation plan: + +1. Whether the dispatcher logs to `log` or to a dedicated logger is left to + the implementation plan. The codebase currently uses `log` everywhere; we + should match that unless the implementer has a reason to introduce + structured logging in this PR. +2. The `agents` field's "agent name" is currently a `string` in + `model.Session.Agent`. We rely on those values being stable. If the + project later introduces an `AgentKind` enum, the validation list and + webhook filter should use it. This is a future refactor, not a blocker. diff --git a/internal/api/server.go b/internal/api/server.go index 43f1282..54a6419 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -46,7 +46,7 @@ type Server struct { // exempt so clients can derive the token from the user's passphrase. An empty // token disables auth entirely (used only by tests and by callers that // explicitly opt out). -func New(host string, provider core.SessionProvider, bearerToken, authSalt string) (*Server, error) { +func New(host string, provider core.SessionProvider, bearerToken, authSalt string, bus *core.EventBus) (*Server, error) { authSalt = strings.TrimSpace(authSalt) if authSalt == "" { authSalt = apiauth.SaltPrefix @@ -55,6 +55,9 @@ func New(host string, provider core.SessionProvider, bearerToken, authSalt strin cfg := core.LoadConfig() manager := core.NewSessionManager(cfg.WindowMinutes, provider) manager.SetExcludeCWDSubstrings(cfg.ExcludeCWDSubstrings) + if bus != nil { + manager.SetEventBus(bus) + } s := &Server{ manager: manager, diff --git a/internal/api/server_test.go b/internal/api/server_test.go index 3b51727..bc3744b 100644 --- a/internal/api/server_test.go +++ b/internal/api/server_test.go @@ -34,7 +34,7 @@ const testSalt = "lazyagent-api-v1-test" func newTestServer(t *testing.T) (*Server, *httptest.Server) { t.Helper() t.Setenv("XDG_CONFIG_HOME", t.TempDir()) - srv, err := New(":0", testProvider{}, testToken, testSalt) + srv, err := New(":0", testProvider{}, testToken, testSalt, nil) if err != nil { t.Fatalf("New: %v", err) } diff --git a/internal/core/activity.go b/internal/core/activity.go index f76d2cb..86ed30b 100644 --- a/internal/core/activity.go +++ b/internal/core/activity.go @@ -181,9 +181,11 @@ func NextActivityFilter(current ActivityKind) ActivityKind { } // ActivityTracker manages sticky activity states with grace period logic. +// When an EventBus is attached, transitions are published on Update. type ActivityTracker struct { activities map[string]*ActivityEntry waitingSince map[string]time.Time + bus *EventBus } // NewActivityTracker creates a new ActivityTracker. @@ -194,8 +196,15 @@ func NewActivityTracker() *ActivityTracker { } } +// SetEventBus attaches a bus so Update will publish transition events. +// Passing nil clears any previously attached bus. +func (t *ActivityTracker) SetEventBus(bus *EventBus) { + t.bus = bus +} + // Update resolves and stores the current activity for each session. // Applies a grace period before showing ActivityWaiting to avoid false positives. +// If an EventBus is attached, transitions are published. func (t *ActivityTracker) Update(sessions []*model.Session, now time.Time) { activeIDs := make(map[string]struct{}, len(sessions)) for _, s := range sessions { @@ -217,7 +226,22 @@ func (t *ActivityTracker) Update(sessions []*model.Session, now time.Time) { delete(t.waitingSince, id) } + var prev ActivityKind + if e, ok := t.activities[id]; ok { + prev = e.Kind + } t.activities[id] = &ActivityEntry{Kind: activity, LastSeen: now} + + if t.bus != nil && prev != "" && prev != activity { + t.bus.Publish(SessionEvent{ + SessionID: id, + Agent: s.Agent, + From: prev, + To: activity, + At: now, + ProjectPath: s.CWD, + }) + } } for id := range t.activities { if _, ok := activeIDs[id]; !ok { diff --git a/internal/core/activity_test.go b/internal/core/activity_test.go index 20f2da3..d6c778b 100644 --- a/internal/core/activity_test.go +++ b/internal/core/activity_test.go @@ -1,6 +1,11 @@ package core -import "testing" +import ( + "testing" + "time" + + "github.com/illegalstudio/lazyagent/internal/model" +) func TestToolActivity_ClaudeToolNames(t *testing.T) { tests := []struct { @@ -50,3 +55,52 @@ func TestToolActivity_PiToolNames(t *testing.T) { } } } + +func TestActivityTracker_EmitsTransitionOnChange(t *testing.T) { + bus := NewEventBus() + ch := bus.Subscribe(8) + defer bus.Unsubscribe(ch) + + tr := NewActivityTracker() + tr.SetEventBus(bus) + + now := time.Now() + s := &model.Session{SessionID: "s1", Agent: "claude", CWD: "/p", LastActivity: now, Status: model.StatusThinking} + tr.Update([]*model.Session{s}, now) + + // First Update for a session is observation, not a transition: no event. + select { + case ev := <-ch: + t.Fatalf("unexpected event on first observation: %+v", ev) + case <-time.After(50 * time.Millisecond): + } + + // Same activity: still no event. + tr.Update([]*model.Session{s}, now) + select { + case ev := <-ch: + t.Fatalf("unexpected event on unchanged state: %+v", ev) + case <-time.After(50 * time.Millisecond): + } + + // Status flips to executing a Bash tool: should emit Thinking→Running. + s.Status = model.StatusExecutingTool + s.CurrentTool = "Bash" + s.RecentTools = []model.ToolCall{{Name: "Bash", Timestamp: now}} + tr.Update([]*model.Session{s}, now) + select { + case ev := <-ch: + if ev.From != ActivityThinking || ev.To != ActivityRunning { + t.Fatalf("unexpected transition: %+v", ev) + } + case <-time.After(time.Second): + t.Fatal("no event on real transition") + } +} + +func TestActivityTracker_NilBusSafe(t *testing.T) { + tr := NewActivityTracker() + // No SetEventBus call. Must not panic. + s := &model.Session{SessionID: "s1", Agent: "claude", LastActivity: time.Now(), Status: model.StatusThinking} + tr.Update([]*model.Session{s}, time.Now()) +} diff --git a/internal/core/config.go b/internal/core/config.go index 84d712a..129bb1c 100644 --- a/internal/core/config.go +++ b/internal/core/config.go @@ -3,6 +3,8 @@ package core import ( "encoding/json" "fmt" + "log" + "net/url" "os" "path/filepath" "strings" @@ -16,6 +18,82 @@ type TUIConfig struct { Theme string `json:"theme"` // "dark" (default) or "light" } +// WebhookConfig is a single outbound webhook destination. +type WebhookConfig struct { + Name string `json:"name"` + URL string `json:"url"` + Secret string `json:"secret,omitempty"` + Events []string `json:"events,omitempty"` + Agents []string `json:"agents,omitempty"` + Enabled *bool `json:"enabled,omitempty"` // absent = true +} + +// IsEnabled returns true unless Enabled is explicitly set to false. +func (w WebhookConfig) IsEnabled() bool { + return w.Enabled == nil || *w.Enabled +} + +// knownActivityNames lists the canonical activity names accepted in config. +var knownActivityNames = map[string]ActivityKind{ + "idle": ActivityIdle, + "waiting": ActivityWaiting, + "thinking": ActivityThinking, + "compacting": ActivityCompacting, + "reading": ActivityReading, + "writing": ActivityWriting, + "running": ActivityRunning, + "searching": ActivitySearching, + "browsing": ActivityBrowsing, + "spawning": ActivitySpawning, +} + +// knownAgentNames lists the agent names accepted in config. +var knownAgentNames = map[string]struct{}{ + "claude": {}, "codex": {}, "pi": {}, "cursor": {}, "amp": {}, "opencode": {}, +} + +// Validate returns nil if the webhook is well-formed. +func (w WebhookConfig) Validate() error { + if strings.TrimSpace(w.Name) == "" { + return fmt.Errorf("webhook: name is required") + } + if strings.TrimSpace(w.URL) == "" { + return fmt.Errorf("webhook %q: url is required", w.Name) + } + u, err := url.Parse(w.URL) + if err != nil { + return fmt.Errorf("webhook %q: url parse: %w", w.Name, err) + } + if u.Scheme != "http" && u.Scheme != "https" { + return fmt.Errorf("webhook %q: url scheme must be http or https, got %q", w.Name, u.Scheme) + } + if u.Host == "" { + return fmt.Errorf("webhook %q: url is missing host", w.Name) + } + for _, ev := range w.Events { + if _, ok := knownActivityNames[strings.ToLower(ev)]; !ok { + return fmt.Errorf("webhook %q: unknown event %q", w.Name, ev) + } + } + for _, ag := range w.Agents { + if _, ok := knownAgentNames[strings.ToLower(ag)]; !ok { + return fmt.Errorf("webhook %q: unknown agent %q", w.Name, ag) + } + } + return nil +} + +// ValidWebhooks returns the subset of webhooks that pass Validate and are enabled. +func (c Config) ValidWebhooks() []WebhookConfig { + out := make([]WebhookConfig, 0, len(c.Webhooks)) + for _, w := range c.Webhooks { + if err := w.Validate(); err == nil && w.IsEnabled() { + out = append(out, w) + } + } + return out +} + // Config holds application settings shared by TUI and GUI. type Config struct { WindowMinutes int `json:"window_minutes"` @@ -28,6 +106,7 @@ type Config struct { ClaudeDirs []string `json:"claude_dirs,omitempty"` ExcludeCWDSubstrings []string `json:"exclude_cwd_substrings"` TUI TUIConfig `json:"tui"` + Webhooks []WebhookConfig `json:"webhooks,omitempty"` // APIPassphrase is the secret used to derive the bearer token that // protects the HTTP API. Empty means the API has not been configured yet // — `lazyagent --api` will prompt for one on first run. @@ -134,6 +213,12 @@ func LoadConfig() Config { _ = SaveConfig(cfg) } + for _, w := range cfg.Webhooks { + if err := w.Validate(); err != nil { + log.Printf("config: %v (skipped)", err) + } + } + return cfg } diff --git a/internal/core/config_test.go b/internal/core/config_test.go index 972c8d8..7519864 100644 --- a/internal/core/config_test.go +++ b/internal/core/config_test.go @@ -71,3 +71,61 @@ func TestLoadConfig_PreservesExistingExcludeCWDSubstrings(t *testing.T) { t.Errorf("LoadConfig().ExcludeCWDSubstrings = %v, want [/custom/path /another]", cfg.ExcludeCWDSubstrings) } } + +func TestWebhookConfig_ValidateOK(t *testing.T) { + tr := true + w := WebhookConfig{ + Name: "slack", + URL: "https://example.com/hook", + Events: []string{"waiting"}, + Agents: []string{"claude"}, + Enabled: &tr, + } + if err := w.Validate(); err != nil { + t.Fatalf("unexpected: %v", err) + } + if !w.IsEnabled() { + t.Fatal("IsEnabled should be true") + } +} + +func TestWebhookConfig_Validate_RejectsMissingFields(t *testing.T) { + cases := []struct { + name string + w WebhookConfig + }{ + {"no name", WebhookConfig{URL: "https://x"}}, + {"no url", WebhookConfig{Name: "x"}}, + {"bad scheme", WebhookConfig{Name: "x", URL: "ftp://x"}}, + {"unparseable", WebhookConfig{Name: "x", URL: "::"}}, + {"unknown event", WebhookConfig{Name: "x", URL: "https://x", Events: []string{"nope"}}}, + {"unknown agent", WebhookConfig{Name: "x", URL: "https://x", Agents: []string{"nope"}}}, + {"no host (http)", WebhookConfig{Name: "x", URL: "http://"}}, + {"no host (https with path)", WebhookConfig{Name: "x", URL: "https:///path"}}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + if err := c.w.Validate(); err == nil { + t.Fatal("expected error") + } + }) + } +} + +func TestWebhookConfig_IsEnabled_DefaultTrue(t *testing.T) { + w := WebhookConfig{Name: "x", URL: "https://x"} + if !w.IsEnabled() { + t.Fatal("absent Enabled should default to true") + } +} + +func TestConfig_ValidWebhooks_SkipsInvalid(t *testing.T) { + cfg := Config{Webhooks: []WebhookConfig{ + {Name: "ok", URL: "https://x"}, + {Name: "bad", URL: "ftp://x"}, + }} + got := cfg.ValidWebhooks() + if len(got) != 1 || got[0].Name != "ok" { + t.Fatalf("got %+v, want only 'ok'", got) + } +} diff --git a/internal/core/eventbus.go b/internal/core/eventbus.go new file mode 100644 index 0000000..a6c7456 --- /dev/null +++ b/internal/core/eventbus.go @@ -0,0 +1,66 @@ +package core + +import ( + "sync" + "time" +) + +// SessionEvent is published when a session's resolved activity changes. +type SessionEvent struct { + SessionID string + Agent string + From ActivityKind + To ActivityKind + At time.Time + ProjectPath string +} + +// EventBus is a minimal typed pub-sub for in-process subscribers. +// Publish never blocks; events are dropped for subscribers whose channel is full. +type EventBus struct { + mu sync.RWMutex + subs []chan SessionEvent +} + +// NewEventBus returns a ready-to-use EventBus. +func NewEventBus() *EventBus { return &EventBus{} } + +// Subscribe registers a new subscriber and returns its channel. +// buf is the channel buffer; pick a size matching the subscriber's drain rate. +func (b *EventBus) Subscribe(buf int) <-chan SessionEvent { + if buf < 1 { + buf = 1 + } + ch := make(chan SessionEvent, buf) + b.mu.Lock() + b.subs = append(b.subs, ch) + b.mu.Unlock() + return ch +} + +// Unsubscribe removes the channel from the bus. Safe to call multiple times. +// The caller must not read from the channel after Unsubscribe returns. +func (b *EventBus) Unsubscribe(ch <-chan SessionEvent) { + b.mu.Lock() + defer b.mu.Unlock() + for i, sub := range b.subs { + if sub == ch { + b.subs = append(b.subs[:i], b.subs[i+1:]...) + return + } + } +} + +// Publish sends e to every subscriber. Non-blocking: subscribers whose channel +// is full miss this event. +func (b *EventBus) Publish(e SessionEvent) { + b.mu.RLock() + subs := append([]chan SessionEvent(nil), b.subs...) + b.mu.RUnlock() + for _, ch := range subs { + select { + case ch <- e: + default: + } + } +} diff --git a/internal/core/eventbus_test.go b/internal/core/eventbus_test.go new file mode 100644 index 0000000..36e8225 --- /dev/null +++ b/internal/core/eventbus_test.go @@ -0,0 +1,118 @@ +package core + +import ( + "sync" + "testing" + "time" +) + +func TestEventBus_PublishSubscribe(t *testing.T) { + bus := NewEventBus() + ch := bus.Subscribe(4) + defer bus.Unsubscribe(ch) + + want := SessionEvent{SessionID: "s1", From: ActivityIdle, To: ActivityThinking, At: time.Unix(0, 0)} + bus.Publish(want) + + select { + case got := <-ch: + if got != want { + t.Fatalf("got %+v, want %+v", got, want) + } + case <-time.After(time.Second): + t.Fatal("subscriber did not receive event") + } +} + +func TestEventBus_DropOnFullSubscriber(t *testing.T) { + bus := NewEventBus() + ch := bus.Subscribe(1) + defer bus.Unsubscribe(ch) + + bus.Publish(SessionEvent{SessionID: "a"}) + bus.Publish(SessionEvent{SessionID: "b"}) // dropped + bus.Publish(SessionEvent{SessionID: "c"}) // dropped + + got := <-ch + if got.SessionID != "a" { + t.Fatalf("got %q, want %q", got.SessionID, "a") + } + select { + case extra := <-ch: + t.Fatalf("unexpected extra event: %+v", extra) + case <-time.After(50 * time.Millisecond): + } +} + +func TestEventBus_UnsubscribeIdempotent(t *testing.T) { + bus := NewEventBus() + ch := bus.Subscribe(1) + bus.Unsubscribe(ch) + bus.Unsubscribe(ch) // must not panic + + // Publish after unsubscribe must not block or send to the closed channel. + done := make(chan struct{}) + go func() { + bus.Publish(SessionEvent{SessionID: "x"}) + close(done) + }() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("Publish after Unsubscribe blocked") + } +} + +func TestEventBus_PublishConcurrentUnsubscribe(t *testing.T) { + // Exercise the race between Publish iterating the subscriber list and + // Unsubscribe mutating it. Run under `-race`. + bus := NewEventBus() + + var wg sync.WaitGroup + for i := 0; i < 8; i++ { + ch := bus.Subscribe(16) + wg.Add(2) + go func(ch <-chan SessionEvent) { + defer wg.Done() + for j := 0; j < 100; j++ { + bus.Publish(SessionEvent{SessionID: "x"}) + } + }(ch) + go func(ch <-chan SessionEvent) { + defer wg.Done() + time.Sleep(time.Microsecond) + bus.Unsubscribe(ch) + }(ch) + } + wg.Wait() +} + +func TestEventBus_ConcurrentPublish(t *testing.T) { + bus := NewEventBus() + ch := bus.Subscribe(1024) + defer bus.Unsubscribe(ch) + + var wg sync.WaitGroup + const n = 100 + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + bus.Publish(SessionEvent{SessionID: "x"}) + }(i) + } + wg.Wait() + + count := 0 + for { + select { + case <-ch: + count++ + case <-time.After(50 * time.Millisecond): + if count != n { + t.Fatalf("received %d events, want %d", count, n) + } + return + } + } +} diff --git a/internal/core/session.go b/internal/core/session.go index c825cd6..706de19 100644 --- a/internal/core/session.go +++ b/internal/core/session.go @@ -61,6 +61,14 @@ func NewSessionManager(windowMinutes int, provider SessionProvider) *SessionMana } } +// SetEventBus attaches an event bus so activity transitions are published +// to subscribers. Pass nil to detach. +func (m *SessionManager) SetEventBus(bus *EventBus) { + m.mu.Lock() + m.tracker.SetEventBus(bus) + m.mu.Unlock() +} + // SetExcludeCWDSubstrings sets the CWD substring patterns used to exclude // sessions from filtered views (VisibleSessions / QuerySessions). func (m *SessionManager) SetExcludeCWDSubstrings(patterns []string) { diff --git a/internal/core/session_test.go b/internal/core/session_test.go index d75c8b2..f2b9f86 100644 --- a/internal/core/session_test.go +++ b/internal/core/session_test.go @@ -151,3 +151,50 @@ func TestFilterSessionsLocked_MultipleExcludePatterns(t *testing.T) { t.Errorf("expected 'normal' session, got %q", visible[0].SessionID) } } + +func TestSessionManager_SetEventBus_PropagatesToTracker(t *testing.T) { + t.Setenv("XDG_CONFIG_HOME", t.TempDir()) + + bus := NewEventBus() + ch := bus.Subscribe(4) + defer bus.Unsubscribe(ch) + + now := time.Now() + p := &mutableStubProvider{sessions: []*model.Session{{SessionID: "s1", Agent: "claude", LastActivity: now, Status: model.StatusThinking}}} + m := NewSessionManager(60, p) + m.SetEventBus(bus) + + // First Reload is an observation: no event expected. + if err := m.Reload(); err != nil { + t.Fatalf("Reload 1: %v", err) + } + select { + case ev := <-ch: + t.Fatalf("unexpected event on first observation: %+v", ev) + case <-time.After(50 * time.Millisecond): + } + + // Mutate state and reload: now we expect a transition event. + p.sessions[0].Status = model.StatusExecutingTool + p.sessions[0].CurrentTool = "Bash" + p.sessions[0].RecentTools = []model.ToolCall{{Name: "Bash", Timestamp: time.Now()}} + p.sessions[0].LastActivity = time.Now() + if err := m.Reload(); err != nil { + t.Fatalf("Reload 2: %v", err) + } + select { + case ev := <-ch: + if ev.From != ActivityThinking || ev.To != ActivityRunning { + t.Fatalf("unexpected transition: %+v", ev) + } + case <-time.After(time.Second): + t.Fatal("no event after real transition") + } +} + +type mutableStubProvider struct{ sessions []*model.Session } + +func (p *mutableStubProvider) DiscoverSessions() ([]*model.Session, error) { return p.sessions, nil } +func (p *mutableStubProvider) UseWatcher() bool { return false } +func (p *mutableStubProvider) RefreshInterval() time.Duration { return 0 } +func (p *mutableStubProvider) WatchDirs() []string { return nil } diff --git a/internal/tray/service.go b/internal/tray/service.go index 7c05465..ca5f76a 100644 --- a/internal/tray/service.go +++ b/internal/tray/service.go @@ -5,6 +5,7 @@ package tray import ( "context" "fmt" + "net/http" "os" "os/exec" "strings" @@ -15,6 +16,7 @@ import ( "github.com/illegalstudio/lazyagent/internal/demo" "github.com/illegalstudio/lazyagent/internal/model" "github.com/illegalstudio/lazyagent/internal/version" + "github.com/illegalstudio/lazyagent/internal/webhook" "github.com/pkg/browser" "github.com/wailsapp/wails/v3/pkg/application" ) @@ -49,6 +51,13 @@ func (s *SessionService) ServiceStartup(ctx context.Context, options application } s.manager = core.NewSessionManager(cfg.WindowMinutes, provider) s.manager.SetExcludeCWDSubstrings(cfg.ExcludeCWDSubstrings) + if len(cfg.ValidWebhooks()) > 0 { + bus := core.NewEventBus() + s.manager.SetEventBus(bus) + httpClient := &http.Client{Timeout: 10 * time.Second} + d := webhook.New(bus, &webhook.ConfigAdapter{Cfg: cfg}, httpClient, func() string { return "" }) + go func() { _ = d.Start(s.ctx) }() + } if err := s.manager.StartWatcher(); err != nil { return err } diff --git a/internal/ui/app.go b/internal/ui/app.go index 84aacb8..e77c1db 100644 --- a/internal/ui/app.go +++ b/internal/ui/app.go @@ -112,11 +112,14 @@ var keys = keyMap{ Copy: key.NewBinding(key.WithKeys("c")), } -func NewModel(provider core.SessionProvider) Model { +func NewModel(provider core.SessionProvider, bus *core.EventBus) Model { cfg := core.LoadConfig() t := LoadTheme(cfg.TUI.Theme) mgr := core.NewSessionManager(cfg.WindowMinutes, provider) mgr.SetExcludeCWDSubstrings(cfg.ExcludeCWDSubstrings) + if bus != nil { + mgr.SetEventBus(bus) + } _ = mgr.StartWatcher() return Model{ theme: t, diff --git a/internal/webhook/configsource.go b/internal/webhook/configsource.go new file mode 100644 index 0000000..16b3bd5 --- /dev/null +++ b/internal/webhook/configsource.go @@ -0,0 +1,15 @@ +package webhook + +import "github.com/illegalstudio/lazyagent/internal/core" + +// ConfigAdapter wraps a core.Config so it satisfies ConfigSource. +// It returns the valid + enabled webhooks at call time, allowing future +// config reloads to take effect on the next event. +type ConfigAdapter struct { + Cfg core.Config +} + +// Webhooks returns the validated + enabled webhooks from the wrapped config. +func (a *ConfigAdapter) Webhooks() []core.WebhookConfig { + return a.Cfg.ValidWebhooks() +} diff --git a/internal/webhook/dispatcher.go b/internal/webhook/dispatcher.go new file mode 100644 index 0000000..a3d2c87 --- /dev/null +++ b/internal/webhook/dispatcher.go @@ -0,0 +1,252 @@ +package webhook + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "sync" + "time" + + "github.com/illegalstudio/lazyagent/internal/core" + "github.com/illegalstudio/lazyagent/internal/version" +) + +// ConfigSource provides the current set of webhook configurations. +// Implementations may return a different slice on each call. +type ConfigSource interface { + Webhooks() []core.WebhookConfig +} + +type lastSeenEntry struct { + from core.ActivityKind + to core.ActivityKind + at time.Time +} + +// Dispatcher consumes SessionEvents from a bus and delivers them as HTTP +// POSTs to configured webhooks. +type Dispatcher struct { + bus *core.EventBus + cfg ConfigSource + client *http.Client + apiAddr func() string + + queueSize int + workers int + backoffs []time.Duration + + dedupWindow time.Duration + dedupTTL time.Duration + + mu sync.Mutex + lastSeen map[string]lastSeenEntry +} + +type deliveryJob struct { + webhook core.WebhookConfig + body []byte + deliveryID string +} + +// New creates a Dispatcher. +func New(bus *core.EventBus, cfg ConfigSource, client *http.Client, apiAddr func() string) *Dispatcher { + if client == nil { + client = &http.Client{Timeout: 10 * time.Second} + } + if apiAddr == nil { + apiAddr = func() string { return "" } + } + return &Dispatcher{ + bus: bus, + cfg: cfg, + client: client, + apiAddr: apiAddr, + queueSize: 256, + workers: 4, + backoffs: []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}, + dedupWindow: 2 * time.Second, + dedupTTL: 5 * time.Minute, + lastSeen: make(map[string]lastSeenEntry), + } +} + +// Start subscribes to the bus and runs until ctx is cancelled. +func (d *Dispatcher) Start(ctx context.Context) error { + events := d.bus.Subscribe(256) + defer d.bus.Unsubscribe(events) + + queue := make(chan deliveryJob, d.queueSize) + + workerDone := make(chan struct{}, d.workers) + for i := 0; i < d.workers; i++ { + go func() { + defer func() { workerDone <- struct{}{} }() + for { + select { + case <-ctx.Done(): + return + case job, ok := <-queue: + if !ok { + return + } + d.deliver(ctx, job) + } + } + }() + } + + for { + select { + case <-ctx.Done(): + close(queue) + for i := 0; i < d.workers; i++ { + <-workerDone + } + return ctx.Err() + case ev := <-events: + d.fanout(ev, queue) + } + } +} + +func (d *Dispatcher) shouldDedup(ev core.SessionEvent) bool { + d.mu.Lock() + defer d.mu.Unlock() + now := time.Now() + prev, ok := d.lastSeen[ev.SessionID] + if ok && prev.from == ev.From && prev.to == ev.To && now.Sub(prev.at) < d.dedupWindow { + return true + } + d.lastSeen[ev.SessionID] = lastSeenEntry{from: ev.From, to: ev.To, at: now} + + // Opportunistic eviction: drop entries older than dedupTTL so the map + // doesn't grow unbounded on long-running processes. Runs once per + // recorded entry — amortized O(1) per call. + if len(d.lastSeen) > 64 { + for id, e := range d.lastSeen { + if now.Sub(e.at) > d.dedupTTL { + delete(d.lastSeen, id) + } + } + } + return false +} + +func (d *Dispatcher) fanout(ev core.SessionEvent, queue chan<- deliveryJob) { + if d.shouldDedup(ev) { + return + } + webhooks := d.cfg.Webhooks() + if len(webhooks) == 0 { + return + } + deliveryID := newDeliveryID() + payload := Payload{ + ID: deliveryID, + Event: "state_transition", + SessionID: ev.SessionID, + Agent: ev.Agent, + From: string(ev.From), + To: string(ev.To), + ProjectPath: ev.ProjectPath, + Timestamp: ev.At.UTC(), + } + if base := d.apiAddr(); base != "" { + payload.API = &APILinks{ + SessionURL: fmt.Sprintf("%s/api/sessions/%s", base, ev.SessionID), + } + } + body, err := json.Marshal(payload) + if err != nil { + log.Printf("webhook: marshal payload: %v", err) + return + } + for _, w := range webhooks { + if !w.IsEnabled() || !Matches(w, ev) { + continue + } + select { + case queue <- deliveryJob{webhook: w, body: body, deliveryID: deliveryID}: + default: + log.Printf("webhook: queue full, dropping delivery for %q", w.Name) + } + } +} + +// deliver performs the POST, retrying on transient failures. +// 4xx is treated as permanent. 5xx, network errors, and timeouts retry +// with backoff up to len(d.backoffs) times (total attempts = 1 + retries). +func (d *Dispatcher) deliver(ctx context.Context, job deliveryJob) { + for attempt := 0; attempt <= len(d.backoffs); attempt++ { + if attempt > 0 { + select { + case <-ctx.Done(): + return + case <-time.After(d.backoffs[attempt-1]): + } + } + status, transient, err := d.doOnce(ctx, job) + if err == nil && !transient { + return + } + if err == nil && status >= 400 && status < 500 { + return + } + if attempt == len(d.backoffs) { + log.Printf("webhook %q: giving up after %d attempts", job.webhook.Name, attempt+1) + } + } +} + +// doOnce performs a single POST. Returns (status, transient, err). +// - 2xx: status=2xx, transient=false, err=nil → success +// - 4xx: status=4xx, transient=false, err=nil → permanent +// - 5xx: status=5xx, transient=true, err=nil → retry +// - network/timeout: status=0, transient=true, err=non-nil → retry +func (d *Dispatcher) doOnce(ctx context.Context, job deliveryJob) (int, bool, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, job.webhook.URL, bytes.NewReader(job.body)) + if err != nil { + return 0, false, err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "lazyagent/"+version.Version) + req.Header.Set("X-Lazyagent-Event", "state_transition") + req.Header.Set("X-Lazyagent-Delivery", job.deliveryID) + if job.webhook.Secret != "" { + req.Header.Set("X-Lazyagent-Signature", Sign(job.webhook.Secret, job.body)) + } + resp, err := d.client.Do(req) + if err != nil { + return 0, true, err + } + defer resp.Body.Close() + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return resp.StatusCode, false, nil + } + snippet, _ := io.ReadAll(io.LimitReader(resp.Body, 200)) + log.Printf("webhook %q: %d %s — %s", job.webhook.Name, resp.StatusCode, resp.Status, string(snippet)) + if resp.StatusCode >= 500 { + return resp.StatusCode, true, nil + } + return resp.StatusCode, false, nil +} + +func newDeliveryID() string { + var b [16]byte + _, _ = rand.Read(b[:]) + b[6] = (b[6] & 0x0f) | 0x40 + b[8] = (b[8] & 0x3f) | 0x80 + return fmt.Sprintf("%s-%s-%s-%s-%s", + hex.EncodeToString(b[0:4]), + hex.EncodeToString(b[4:6]), + hex.EncodeToString(b[6:8]), + hex.EncodeToString(b[8:10]), + hex.EncodeToString(b[10:16]), + ) +} diff --git a/internal/webhook/dispatcher_test.go b/internal/webhook/dispatcher_test.go new file mode 100644 index 0000000..f4f1750 --- /dev/null +++ b/internal/webhook/dispatcher_test.go @@ -0,0 +1,360 @@ +package webhook + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/illegalstudio/lazyagent/internal/core" +) + +type stubCfg struct{ webhooks []core.WebhookConfig } + +func (s stubCfg) Webhooks() []core.WebhookConfig { return s.webhooks } + +func TestDispatcher_HappyPath(t *testing.T) { + var mu sync.Mutex + var bodies []map[string]any + var headers []http.Header + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b, _ := io.ReadAll(r.Body) + var p map[string]any + _ = json.Unmarshal(b, &p) + mu.Lock() + bodies = append(bodies, p) + headers = append(headers, r.Header.Clone()) + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: 2 * time.Second}, func() string { return "" }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + time.Sleep(20 * time.Millisecond) + + bus.Publish(core.SessionEvent{SessionID: "s1", Agent: "claude", From: core.ActivityIdle, To: core.ActivityWaiting, ProjectPath: "/p", At: time.Now()}) + + deadline := time.Now().Add(2 * time.Second) + for { + mu.Lock() + n := len(bodies) + mu.Unlock() + if n >= 1 || time.Now().After(deadline) { + break + } + time.Sleep(20 * time.Millisecond) + } + + mu.Lock() + defer mu.Unlock() + if len(bodies) != 1 { + t.Fatalf("got %d POSTs, want 1", len(bodies)) + } + if bodies[0]["session_id"] != "s1" || bodies[0]["to"] != "waiting" { + t.Fatalf("unexpected body: %+v", bodies[0]) + } + if h := headers[0].Get("X-Lazyagent-Event"); h != "state_transition" { + t.Errorf("X-Lazyagent-Event = %q", h) + } + if h := headers[0].Get("X-Lazyagent-Delivery"); h == "" { + t.Error("X-Lazyagent-Delivery missing") + } + if h := headers[0].Get("Content-Type"); h != "application/json" { + t.Errorf("Content-Type = %q", h) + } +} + +func TestDispatcher_Retry500Then200(t *testing.T) { + var attempts int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if atomic.AddInt32(&attempts, 1) == 1 { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + d.backoffs = []time.Duration{10 * time.Millisecond, 10 * time.Millisecond, 10 * time.Millisecond} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + time.Sleep(20 * time.Millisecond) + + bus.Publish(core.SessionEvent{SessionID: "s1", To: core.ActivityWaiting, At: time.Now()}) + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if atomic.LoadInt32(&attempts) >= 2 { + break + } + time.Sleep(20 * time.Millisecond) + } + if a := atomic.LoadInt32(&attempts); a != 2 { + t.Fatalf("got %d attempts, want 2", a) + } +} + +func TestDispatcher_NoRetryOn400(t *testing.T) { + var attempts int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusBadRequest) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + d.backoffs = []time.Duration{10 * time.Millisecond, 10 * time.Millisecond, 10 * time.Millisecond} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + time.Sleep(20 * time.Millisecond) + + bus.Publish(core.SessionEvent{SessionID: "s1", To: core.ActivityWaiting, At: time.Now()}) + time.Sleep(200 * time.Millisecond) + + if a := atomic.LoadInt32(&attempts); a != 1 { + t.Fatalf("got %d attempts, want 1 (no retry on 4xx)", a) + } +} + +func TestDispatcher_AllAttemptsFail(t *testing.T) { + var attempts int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + d.backoffs = []time.Duration{10 * time.Millisecond, 10 * time.Millisecond, 10 * time.Millisecond} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + time.Sleep(20 * time.Millisecond) + + bus.Publish(core.SessionEvent{SessionID: "s1", To: core.ActivityWaiting, At: time.Now()}) + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if atomic.LoadInt32(&attempts) >= 4 { + break + } + time.Sleep(20 * time.Millisecond) + } + if a := atomic.LoadInt32(&attempts); a != 4 { + t.Fatalf("got %d attempts, want 4 (initial + 3 retries)", a) + } +} + +func TestDispatcher_HMACHeaderWhenSecretSet(t *testing.T) { + var mu sync.Mutex + var sigHeader string + var body []byte + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sig := r.Header.Get("X-Lazyagent-Signature") + b, _ := io.ReadAll(r.Body) + mu.Lock() + sigHeader = sig + body = b + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL, Secret: "hello"}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + time.Sleep(20 * time.Millisecond) + + bus.Publish(core.SessionEvent{SessionID: "s1", To: core.ActivityWaiting, At: time.Now()}) + time.Sleep(200 * time.Millisecond) + + mu.Lock() + sig := sigHeader + b := body + mu.Unlock() + + if sig == "" { + t.Fatal("X-Lazyagent-Signature missing") + } + if want := Sign("hello", b); sig != want { + t.Fatalf("got %q, want %q", sig, want) + } +} + +func TestDispatcher_NoHMACWhenSecretEmpty(t *testing.T) { + var mu sync.Mutex + var sigHeader string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sig := r.Header.Get("X-Lazyagent-Signature") + mu.Lock() + sigHeader = sig + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + time.Sleep(20 * time.Millisecond) + + bus.Publish(core.SessionEvent{SessionID: "s1", To: core.ActivityWaiting, At: time.Now()}) + time.Sleep(200 * time.Millisecond) + + mu.Lock() + sig := sigHeader + mu.Unlock() + + if sig != "" { + t.Fatalf("X-Lazyagent-Signature should be absent, got %q", sig) + } +} + +func TestDispatcher_DedupesSameTransitionWithinWindow(t *testing.T) { + var attempts int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + time.Sleep(20 * time.Millisecond) + + now := time.Now() + ev := core.SessionEvent{SessionID: "s1", From: core.ActivityIdle, To: core.ActivityWaiting, At: now} + bus.Publish(ev) + bus.Publish(ev) // duplicate from a second manager + bus.Publish(ev) // duplicate from a third + + time.Sleep(300 * time.Millisecond) + + if a := atomic.LoadInt32(&attempts); a != 1 { + t.Fatalf("got %d POSTs, want 1 (dedup)", a) + } +} + +func TestDispatcher_DistinctTransitionsNotDeduped(t *testing.T) { + var attempts int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = d.Start(ctx) }() + time.Sleep(20 * time.Millisecond) + + now := time.Now() + bus.Publish(core.SessionEvent{SessionID: "s1", From: core.ActivityIdle, To: core.ActivityWaiting, At: now}) + bus.Publish(core.SessionEvent{SessionID: "s1", From: core.ActivityWaiting, To: core.ActivityThinking, At: now}) + + time.Sleep(300 * time.Millisecond) + + if a := atomic.LoadInt32(&attempts); a != 2 { + t.Fatalf("got %d POSTs, want 2 (distinct transitions)", a) + } +} + +func TestDispatcher_LastSeenEvictsOldEntries(t *testing.T) { + bus := core.NewEventBus() + cfg := stubCfg{webhooks: nil} // no webhooks; we only care about lastSeen + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + + // Trigger > 64 entries so eviction kicks in, with old timestamps. + d.mu.Lock() + for i := 0; i < 100; i++ { + d.lastSeen[fmt.Sprintf("old-%d", i)] = lastSeenEntry{ + from: core.ActivityIdle, + to: core.ActivityThinking, + at: time.Now().Add(-1 * time.Hour), + } + } + d.mu.Unlock() + + // Recording a new entry should trigger eviction of all old entries. + d.shouldDedup(core.SessionEvent{SessionID: "fresh", From: core.ActivityIdle, To: core.ActivityWaiting, At: time.Now()}) + + d.mu.Lock() + defer d.mu.Unlock() + for id := range d.lastSeen { + if !strings.HasPrefix(id, "fresh") { + t.Fatalf("old entry %q was not evicted", id) + } + } +} + +func TestDispatcher_ContextCancelStopsCleanly(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + bus := core.NewEventBus() + cfg := stubCfg{webhooks: []core.WebhookConfig{{Name: "test", URL: srv.URL}}} + d := New(bus, cfg, &http.Client{Timeout: time.Second}, nil) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + _ = d.Start(ctx) + close(done) + }() + time.Sleep(20 * time.Millisecond) + + for i := 0; i < 50; i++ { + bus.Publish(core.SessionEvent{SessionID: fmt.Sprintf("s%d", i), To: core.ActivityWaiting, At: time.Now()}) + } + cancel() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("dispatcher did not stop after context cancel") + } +} diff --git a/internal/webhook/filter.go b/internal/webhook/filter.go new file mode 100644 index 0000000..6a1f7b1 --- /dev/null +++ b/internal/webhook/filter.go @@ -0,0 +1,39 @@ +package webhook + +import ( + "strings" + + "github.com/illegalstudio/lazyagent/internal/core" +) + +// Matches returns true when the event passes the webhook's event/agent filters. +// Empty filter slices match everything. +func Matches(w core.WebhookConfig, ev core.SessionEvent) bool { + if len(w.Events) > 0 { + want := strings.ToLower(string(ev.To)) + ok := false + for _, e := range w.Events { + if strings.ToLower(e) == want { + ok = true + break + } + } + if !ok { + return false + } + } + if len(w.Agents) > 0 { + want := strings.ToLower(ev.Agent) + ok := false + for _, a := range w.Agents { + if strings.ToLower(a) == want { + ok = true + break + } + } + if !ok { + return false + } + } + return true +} diff --git a/internal/webhook/filter_test.go b/internal/webhook/filter_test.go new file mode 100644 index 0000000..6139346 --- /dev/null +++ b/internal/webhook/filter_test.go @@ -0,0 +1,33 @@ +package webhook + +import ( + "testing" + + "github.com/illegalstudio/lazyagent/internal/core" +) + +func TestMatches(t *testing.T) { + ev := core.SessionEvent{Agent: "claude", To: core.ActivityWaiting} + + cases := []struct { + name string + w core.WebhookConfig + matches bool + }{ + {"empty filters match all", core.WebhookConfig{Name: "x", URL: "https://x"}, true}, + {"matching event", core.WebhookConfig{Name: "x", URL: "https://x", Events: []string{"waiting"}}, true}, + {"non-matching event", core.WebhookConfig{Name: "x", URL: "https://x", Events: []string{"thinking"}}, false}, + {"matching agent", core.WebhookConfig{Name: "x", URL: "https://x", Agents: []string{"claude"}}, true}, + {"non-matching agent", core.WebhookConfig{Name: "x", URL: "https://x", Agents: []string{"codex"}}, false}, + {"event AND agent both match", core.WebhookConfig{Name: "x", URL: "https://x", Events: []string{"waiting"}, Agents: []string{"claude"}}, true}, + {"event matches, agent doesn't", core.WebhookConfig{Name: "x", URL: "https://x", Events: []string{"waiting"}, Agents: []string{"codex"}}, false}, + {"case-insensitive event", core.WebhookConfig{Name: "x", URL: "https://x", Events: []string{"WAITING"}}, true}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + if got := Matches(c.w, ev); got != c.matches { + t.Fatalf("got %v, want %v", got, c.matches) + } + }) + } +} diff --git a/internal/webhook/hmac.go b/internal/webhook/hmac.go new file mode 100644 index 0000000..b56e3e0 --- /dev/null +++ b/internal/webhook/hmac.go @@ -0,0 +1,15 @@ +package webhook + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/hex" +) + +// Sign returns the HMAC-SHA256 of body keyed by secret, formatted as +// "sha256=" — the same convention used by GitHub webhooks. +func Sign(secret string, body []byte) string { + mac := hmac.New(sha256.New, []byte(secret)) + mac.Write(body) + return "sha256=" + hex.EncodeToString(mac.Sum(nil)) +} diff --git a/internal/webhook/hmac_test.go b/internal/webhook/hmac_test.go new file mode 100644 index 0000000..13c4f9f --- /dev/null +++ b/internal/webhook/hmac_test.go @@ -0,0 +1,19 @@ +package webhook + +import "testing" + +func TestSign_KnownVector(t *testing.T) { + // HMAC-SHA256("it's a secret", `{"foo":"bar"}`) formatted as sha256=. + // Vector verified with: echo -n '{"foo":"bar"}' | openssl dgst -sha256 -hmac "it's a secret" + const want = "sha256=98e87fdc5126c604e0faff20d289f1cefbcc6816ee9ebb60451278d96751ce80" + got := Sign("it's a secret", []byte(`{"foo":"bar"}`)) + if got != want { + t.Fatalf("got %q, want %q", got, want) + } +} + +func TestSign_EmptySecret(t *testing.T) { + if Sign("", []byte("x")) == "" { + t.Fatal("Sign with empty secret should still return a valid signature string") + } +} diff --git a/internal/webhook/payload.go b/internal/webhook/payload.go new file mode 100644 index 0000000..711f56a --- /dev/null +++ b/internal/webhook/payload.go @@ -0,0 +1,22 @@ +package webhook + +import "time" + +// Payload is the JSON body sent on every webhook delivery. +type Payload struct { + ID string `json:"id"` + Event string `json:"event"` + SessionID string `json:"session_id"` + Agent string `json:"agent"` + From string `json:"from"` + To string `json:"to"` + ProjectPath string `json:"project_path"` + Timestamp time.Time `json:"timestamp"` + API *APILinks `json:"api,omitempty"` +} + +// APILinks point back to the local lazyagent API server for full details. +// Present only when the API server is running. +type APILinks struct { + SessionURL string `json:"session_url"` +} diff --git a/internal/webhook/payload_test.go b/internal/webhook/payload_test.go new file mode 100644 index 0000000..55abebc --- /dev/null +++ b/internal/webhook/payload_test.go @@ -0,0 +1,49 @@ +package webhook + +import ( + "encoding/json" + "strings" + "testing" + "time" + + "github.com/illegalstudio/lazyagent/internal/core" +) + +func TestPayload_MarshalContainsExpectedFields(t *testing.T) { + p := Payload{ + ID: "f47ac10b-58cc-4372-a567-0e02b2c3d479", + Event: "state_transition", + SessionID: "abc", + Agent: "claude", + From: string(core.ActivityIdle), + To: string(core.ActivityWaiting), + ProjectPath: "/p", + Timestamp: time.Date(2026, 5, 19, 14, 30, 0, 0, time.UTC), + API: &APILinks{ + SessionURL: "http://127.0.0.1:7421/api/sessions/abc", + }, + } + b, err := json.Marshal(p) + if err != nil { + t.Fatalf("marshal: %v", err) + } + s := string(b) + for _, want := range []string{ + `"id":"f47ac10b`, `"event":"state_transition"`, `"session_id":"abc"`, + `"agent":"claude"`, `"from":"idle"`, `"to":"waiting"`, + `"project_path":"/p"`, `"timestamp":"2026-05-19T14:30:00Z"`, + `"api":{`, `"session_url":"http://127.0.0.1:7421/api/sessions/abc"`, + } { + if !strings.Contains(s, want) { + t.Errorf("missing %q in %s", want, s) + } + } +} + +func TestPayload_MarshalOmitsAPIWhenNil(t *testing.T) { + p := Payload{ID: "x", Event: "state_transition", SessionID: "s"} + b, _ := json.Marshal(p) + if strings.Contains(string(b), `"api"`) { + t.Fatalf("api field should be omitted: %s", b) + } +} diff --git a/main.go b/main.go index b702fce..6794413 100644 --- a/main.go +++ b/main.go @@ -4,12 +4,16 @@ import ( "context" "flag" "fmt" + "net" + "net/http" "os" "os/exec" "os/signal" "strconv" "strings" + "sync/atomic" "syscall" + "time" tea "github.com/charmbracelet/bubbletea" "github.com/illegalstudio/lazyagent/internal/api" @@ -25,6 +29,7 @@ import ( "github.com/illegalstudio/lazyagent/internal/tray" "github.com/illegalstudio/lazyagent/internal/ui" "github.com/illegalstudio/lazyagent/internal/version" + "github.com/illegalstudio/lazyagent/internal/webhook" ) var trayPidFile = os.TempDir() + "/lazyagent-tray.pid" @@ -171,6 +176,23 @@ If you find lazyagent useful, leave a ⭐ → https://github.com/illegalstudio/l ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() + // EventBus + webhook dispatcher for the main process. + // When --gui is set the tray fork owns webhook delivery (avoids + // cross-process duplicate POSTs); otherwise the main process owns it. + var eventBus *core.EventBus + var apiAddrAtomic atomic.Value + apiAddrAtomic.Store("") + if len(cfg.ValidWebhooks()) > 0 && os.Getenv("LAZYAGENT_DETACHED") == "" && !runGUI { + eventBus = core.NewEventBus() + httpClient := &http.Client{Timeout: 10 * time.Second} + apiAddrFunc := func() string { + v, _ := apiAddrAtomic.Load().(string) + return v + } + disp := webhook.New(eventBus, &webhook.ConfigAdapter{Cfg: cfg}, httpClient, apiAddrFunc) + go func() { _ = disp.Start(ctx) }() + } + var apiDone chan struct{} if runAPI { @@ -180,12 +202,16 @@ If you find lazyagent useful, leave a ⭐ → https://github.com/illegalstudio/l os.Exit(1) } - srv, err := api.New(*apiHost, provider, bearerToken, cfg.APISalt) + srv, err := api.New(*apiHost, provider, bearerToken, cfg.APISalt, eventBus) if err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) os.Exit(1) } + // Store the resolved listen address so the webhook dispatcher can + // include a self-link in outbound payloads. + apiAddrAtomic.Store("http://" + normalizeAPIAddr(srv.Addr().String())) + if runTUI { // API in background, TUI in foreground. apiDone = make(chan struct{}) @@ -207,7 +233,7 @@ If you find lazyagent useful, leave a ⭐ → https://github.com/illegalstudio/l if runTUI { p := tea.NewProgram( - ui.NewModel(provider), + ui.NewModel(provider, eventBus), tea.WithAltScreen(), tea.WithMouseCellMotion(), ) @@ -296,6 +322,19 @@ func forkTray(demoMode bool, agentMode string) { } } +// normalizeAPIAddr substitutes wildcard bind hosts with a client-usable +// loopback so payloads carry navigable URLs instead of 0.0.0.0:NNNN. +func normalizeAPIAddr(addr string) string { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return addr + } + if host == "" || host == "0.0.0.0" || host == "::" { + host = "127.0.0.1" + } + return net.JoinHostPort(host, port) +} + // killPreviousTray reads the PID file, kills the old process if still alive, and cleans up. func killPreviousTray() { data, err := os.ReadFile(trayPidFile)