From 04f0084d609111f16e7bf1c16724bc2b4ea864c9 Mon Sep 17 00:00:00 2001 From: Aaron Brethorst Date: Sat, 9 May 2026 22:23:54 -0700 Subject: [PATCH 1/4] =?UTF-8?q?docs(subscribe):=20spec=20=E2=80=94=20skip?= =?UTF-8?q?=20stale=20events=20on=20initial=20backfill?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Design for making /subscribe/ drop events older than the source's SkewWindow during the initial replay drain, while leaving live tail and manual replay paths untouched. Aligns the auto-replay behavior with Standard Webhooks consumer timestamp tolerances. --- ...-subscribe-stale-backfill-filter-design.md | 133 ++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md diff --git a/docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md b/docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md new file mode 100644 index 0000000..bd351de --- /dev/null +++ b/docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md @@ -0,0 +1,133 @@ +# Skip stale events on `/subscribe` initial backfill + +**Status:** approved, pending implementation +**Date:** 2026-05-09 +**Affects:** `internal/subscribe`, `internal/server` (handler wiring) + +## Problem + +`/subscribe/` opens an SSE stream that first replays every event since the caller's cursor and then tails live. The replay is byte-for-byte: each event carries the original provider headers (Render's Standard Webhooks set: `webhook-id`, `webhook-timestamp`, `webhook-signature`). + +Standard Webhooks consumers reject messages whose `webhook-timestamp` is older than a tolerance window (5 minutes by default; hardcoded in the `standard-webhooks` Ruby gem and several other implementations). Any event sitting in the relay's store longer than that window will be rejected by a verifying consumer when replayed — verification fails with `"Message timestamp too old"` even though the HMAC is valid. + +The conflict is structural. The relay durably stores webhooks; consumers verify timestamps strictly. As soon as automatic replay crosses the consumer's tolerance, every redelivery 401s. The Rails app behind `hooksctl forward` is exhibiting exactly this: + +``` +Render webhook: signature verification failed: Message timestamp too old +Filter chain halted as :verify_render_signature rendered or redirected +Completed 401 Unauthorized +``` + +## Decision + +Stop automatic catch-up of stale events. Manual replay is unaffected. + +A fresh `/subscribe/` connection's **initial backfill** filters out events whose `provider_timestamp` is older than the source's existing `SkewWindow` (default 5 minutes, configurable per source in `hooks.yaml`). The cursor advances past skipped events so reconnects do not re-evaluate them. **Live tail is unaffected** — drains triggered by the notifier or the keepalive ticker do not filter. + +This is a deliberate inversion of the relay's documented "replay anything missed while disconnected" promise. A consumer offline for longer than `SkewWindow` will silently miss events that landed during the gap. The trade-off is accepted because: + +- Standard Webhooks consumers cannot accept stale messages without weakening their own replay-attack defense. +- The relay still owns the events. Operators can recover any specific delivery via the inspector's "Replay to listeners" action, `hooksctl replay`, or direct DB inspection — those paths are unchanged. + +## Behavior in detail + +### What changes + +`/subscribe/` initial backfill — the first `drain` call before the live select loop — filters events by age. + +| Field used for the age check | `provider_timestamp` (the original webhook timestamp; matches what the consumer will check) | +| Threshold | the source's `SkewWindow` from config (default 5m) | +| Comparison | `now - provider_timestamp > SkewWindow` skips; equal-to-window passes (matches `render.go:102` `delta > skew` semantics) | +| Cursor on skip | advances to the skipped event's sequence so future reconnects with `?since=` start past it | +| Skipped event in the store | unchanged; remains queryable, replayable, and pruneable per existing retention rules | +| Logging | one `slog.Debug` line per skip with `source`, `seq`, `delivery_id`, `age`, `skew_window` | + +### What does not change + +- **Live tail.** Once initial backfill returns, drains triggered by `Notifier.Publish` or the keepalive ticker do **not** filter. Live ingest events are fresh by definition (they just passed the same `SkewWindow` check at ingest), and the inspector "Replay to listeners" button uses `Notifier.Publish` to wake currently-connected SSE subscribers — that path stays open. +- **Push subscriptions.** `internal/push.Manager` workers and `Push.ReplayOne` operate on a separate code path with a separate cursor model. Untouched. +- **Manual inspector replay** (`POST /events/{source}/{sequence}/replay`). Untouched. `Push.ReplayOne` continues to deliver to push subscribers regardless of age. The SSE side of manual replay (which uses `Notifier.Publish`) keeps working for already-connected subscribers because live tail is unfiltered. Edge case: if a fresh SSE subscriber happens to be inside its initial-backfill drain at the moment a stale event is manually replayed, that subscriber will skip the event during backfill (initial backfill filters). This is consistent with the design — initial backfill is the catch-up path the design suppresses — and recovers via reconnect (live-tail) plus the relay's other replay surfaces. +- **`hooksctl forward`.** No client change. The server change is sufficient. +- **Ingest-time skew check.** Existing behavior in `internal/sources/render.go` is correct and unchanged. +- **Retention / pruning.** Stale events live in the store under each source's configured retention; only their automatic redelivery is suppressed. + +## Architecture + +### Plumbing + +`subscribe.Handler` currently holds: + +```go +type Handler struct { + Sources map[string]bool + // ... +} +``` + +It needs the per-source `SkewWindow` to apply the filter. Replace with: + +```go +type Handler struct { + Sources map[string]time.Duration // skew window per allowed source; zero = source not allowed + // ... +} +``` + +`internal/server.Build` constructs the handler and already has the per-source config in scope (it reads `hooks.yaml` into a `config.Config` whose `Source` entries carry `SkewWindow`). Wiring change is local to `Build`. + +`subscribe.New`'s signature changes from `sources []string` to `sources map[string]time.Duration`. The package is internal and the production caller is a single line in `internal/server.Build`; tests are updated alongside the change. No backward-compat shim. + +### Filter location + +The age filter lives in `subscribe.Handler.drain`, gated by a parameter that says "this is the initial backfill" vs "this is a live drain." Implementation sketch: + +```go +// Initial backfill: filter by age. +cursor, err := h.drain(ctx, w, flusher, source, cursor, batchLimit, h.Sources[source]) // pass skew +// Live drains: no filter. +cursor, err := h.drain(ctx, w, flusher, source, cursor, batchLimit, 0) // 0 = no filter +``` + +Inside `drain`, when `maxAge > 0`, events older than `now - maxAge` are skipped and the cursor is advanced past them; otherwise emit unconditionally (current behavior). + +The `now` source is injected for testability — Handler gains a `Now func() time.Time` field defaulting to `time.Now`, mirroring the pattern already used in `internal/sources/render.go`. + +## TDD outline + +Tests live in `internal/subscribe/handler_test.go` alongside the existing suite. + +1. **Initial backfill skips a stale event.** Seed one event with `provider_timestamp = now - 10m`, source `SkewWindow = 5m`. Connect; read SSE stream until live transition. Assert no SSE message was emitted for the stale seq. +2. **Initial backfill delivers a fresh event.** Same shape, `provider_timestamp = now - 1m`. Assert the event is emitted. +3. **Mixed batch, cursor advances past skipped events.** Seed `[stale, fresh, stale, fresh]`. Assert only the two fresh events are emitted. Reconnect with `?since=0`; assert the same two fresh events emit and stale ones remain suppressed (idempotent). +4. **Live tail does not filter.** Connect, finish initial backfill, then ingest a stale-timestamped event directly into the store and call `Notifier.Publish(source, seq)`. Assert it is emitted via the live drain. (Models the manual-replay-of-stale-event-to-live-SSE-subscriber path.) +5. **Boundary case at exactly `SkewWindow`.** Event with `provider_timestamp = now - SkewWindow` (delta == skew): emit. Matches `delta > skew` in `render.go:102`. +6. **Skip is observable.** Capture the handler's logger; assert one debug-level entry per skipped event with `source`, `seq`, `delivery_id`, `age`, `skew_window` keys. Plaintext body / signature do not appear. +7. **Default when source is missing from the map.** Direct construction with an unknown source returns 404 (existing behavior). Reasserted to guard the map shape change. + +Test injection points: + +- `Handler.Now` for clock control (defaults to `time.Now`). +- `Handler.Sources` carrying per-source `SkewWindow` values. +- Existing `httptest`-based handler harness reused. + +## Out of scope + +- A query-parameter override (`?max_age=...`) for client-side tuning. The server policy is uniform. +- Push-side mirror behavior. Push delivery already cursors per-subscription independently and has different replay semantics; revisiting it is its own design. +- A new YAML field, env var, or CLI flag. The change reuses `SkewWindow`. +- Notifying the consumer that events were skipped (e.g. an SSE comment with the skipped count). Operators have the inspector and `hooksctl replay`; the SSE protocol stays minimal. + +## Risks and mitigations + +| Risk | Mitigation | +|------|------------| +| Operator surprise — events present in the store but never replayed. | Debug log on every skip; `/audit` and inspector remain authoritative views; CLAUDE.md and `docs/quickstart.md` updated to describe the new behavior. (Doc updates are scoped into the implementation plan, not this design.) | +| Source with `SkewWindow = 0` (operator opted out of skew enforcement at ingest). | Treat `0` as "no filter" on the backfill side too. Consistent with the ingest-side meaning of zero. | +| Test clock divergence — `time.Now` vs injected `Now`. | Single source of truth via `Handler.Now`; production wiring leaves it nil and falls back to `time.Now`. | + +## Done when + +- All seven tests above pass. +- Existing `internal/subscribe` tests remain green. +- `make lint && make test` clean. +- A short note added to `internal/subscribe/handler.go` package doc and `CLAUDE.md` `internal/subscribe` bullet describing the new policy. (Implementation-plan item; not part of this design.) From 20b89ac9f66a1d2c7e8c88aad5cd50726792aeff Mon Sep 17 00:00:00 2001 From: Aaron Brethorst Date: Sat, 9 May 2026 22:52:45 -0700 Subject: [PATCH 2/4] docs(subscribe): revise stale-backfill spec after architect review Resolves ambiguities surfaced in audit: - Sources map semantics: presence-checked, value is the resolved effective_skew (zero meaning "use verifier default" handled in server.Build, not in the handler). - Drain split into initialDrain (filters) + liveDrain (unfiltered) so the type system enforces the live-tail-not-filtered invariant. - One-sided filter: future and zero-value provider_timestamps emit. - Cursor-advance-on-skip elevated to a load-bearing invariant with its own TDD case (all-stale batch + live emit). - Manual-replay-on-SSE caveat tightened (only reaches subscribers whose cursor < replayed.seq). - /audit correctly described as operator-action volume; not part of the recovery surface. - Logging types pinned (slog.Int64 for seq, slog.Duration for age and skew_window). - Doc strings to update named explicitly (README.md:3, README.md:107, docs/quickstart.md:138, plus handler.go package doc and CLAUDE.md). --- ...-subscribe-stale-backfill-filter-design.md | 126 ++++++++++++------ 1 file changed, 86 insertions(+), 40 deletions(-) diff --git a/docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md b/docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md index bd351de..b89b38a 100644 --- a/docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md +++ b/docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md @@ -22,33 +22,46 @@ Completed 401 Unauthorized Stop automatic catch-up of stale events. Manual replay is unaffected. -A fresh `/subscribe/` connection's **initial backfill** filters out events whose `provider_timestamp` is older than the source's existing `SkewWindow` (default 5 minutes, configurable per source in `hooks.yaml`). The cursor advances past skipped events so reconnects do not re-evaluate them. **Live tail is unaffected** — drains triggered by the notifier or the keepalive ticker do not filter. +A fresh `/subscribe/` connection's **initial backfill** filters out events whose `provider_timestamp` is older than the source's effective `SkewWindow` (the per-source value from `hooks.yaml`, falling back to the verifier's 5-minute default — the same `effective_skew` ingest already enforces). The cursor advances past skipped events so reconnects do not re-evaluate them. **Live tail is unaffected** — drains triggered by the notifier or the keepalive ticker do not filter. -This is a deliberate inversion of the relay's documented "replay anything missed while disconnected" promise. A consumer offline for longer than `SkewWindow` will silently miss events that landed during the gap. The trade-off is accepted because: +This is a deliberate inversion of the relay's documented "replay anything missed while disconnected" promise. A consumer offline for longer than `effective_skew` will silently miss events that landed during the gap. The trade-off is accepted because: - Standard Webhooks consumers cannot accept stale messages without weakening their own replay-attack defense. -- The relay still owns the events. Operators can recover any specific delivery via the inspector's "Replay to listeners" action, `hooksctl replay`, or direct DB inspection — those paths are unchanged. +- The relay still owns the events. Operators recover any specific delivery via the inspector's "Replay to listeners" action, `hooksctl replay`, or direct DB inspection — those paths are unchanged. + +(Note: `/audit` is **not** part of this recovery surface. `internal/audit` records operator-action volume, not webhook volume; skipped events do not produce audit rows.) ## Behavior in detail ### What changes -`/subscribe/` initial backfill — the first `drain` call before the live select loop — filters events by age. +`/subscribe/` initial backfill — the first drain call before the live select loop — filters events by age. -| Field used for the age check | `provider_timestamp` (the original webhook timestamp; matches what the consumer will check) | -| Threshold | the source's `SkewWindow` from config (default 5m) | -| Comparison | `now - provider_timestamp > SkewWindow` skips; equal-to-window passes (matches `render.go:102` `delta > skew` semantics) | -| Cursor on skip | advances to the skipped event's sequence so future reconnects with `?since=` start past it | -| Skipped event in the store | unchanged; remains queryable, replayable, and pruneable per existing retention rules | -| Logging | one `slog.Debug` line per skip with `source`, `seq`, `delivery_id`, `age`, `skew_window` | +| Aspect | Behavior | +|---|---| +| Age field | `provider_timestamp` (the original webhook timestamp; matches what the consumer's signature-verifier will check). Filter is one-sided. | +| Threshold | `effective_skew(source)` — the same value ingest uses. If `hooks.yaml` sets a non-zero `skew_window`, that value; otherwise the verifier's 5-minute default. **Zero is not a disable switch** — it propagates to the same default ingest already applies. | +| Comparison | `now - provider_timestamp > effective_skew` skips; equal-to-window passes (matches `delta > skew` at `internal/sources/render.go:102`). | +| Future timestamps | Always pass. A producer-clock-fast event that admitted at ingest (delta inside `[-skew, +skew]`) yields a negative `now - provider_timestamp` on backfill and emits unconditionally. | +| Zero `provider_timestamp` | Always emit. Failing open: a future verifier that omits the field shouldn't silently swallow events. Render's verifier always populates it today (`internal/sources/render.go:84`), so this is a forward-compat guard. | +| Cursor on skip | Advances to the skipped event's sequence so future reconnects with `?since=` start past it, and so a follow-up live drain does not re-emit the skipped events. **Load-bearing** — see TDD case 4. | +| Skipped event in store | Unchanged. Remains queryable, replayable, and pruneable per existing retention rules. | +| Logging | One `slog.Debug` line per skip: `slog.String("source", ...)`, `slog.Int64("seq", ...)`, `slog.String("delivery_id", ...)`, `slog.Duration("age", ...)`, `slog.Duration("skew_window", ...)`. No body, signature, or token bytes. | ### What does not change -- **Live tail.** Once initial backfill returns, drains triggered by `Notifier.Publish` or the keepalive ticker do **not** filter. Live ingest events are fresh by definition (they just passed the same `SkewWindow` check at ingest), and the inspector "Replay to listeners" button uses `Notifier.Publish` to wake currently-connected SSE subscribers — that path stays open. +- **Live tail.** Once initial backfill returns, drains triggered by `Notifier.Publish` or the keepalive ticker do **not** filter. Live ingest events are fresh by definition (they just passed the same `effective_skew` check at ingest), and the inspector "Replay to listeners" button uses `Notifier.Publish` to wake currently-connected SSE subscribers — that path stays open. The keepalive ticker is started after the initial drain returns (`internal/subscribe/handler.go:138` is after the initial drain call), so the ticker cannot fire mid-initial-drain. + - **Push subscriptions.** `internal/push.Manager` workers and `Push.ReplayOne` operate on a separate code path with a separate cursor model. Untouched. -- **Manual inspector replay** (`POST /events/{source}/{sequence}/replay`). Untouched. `Push.ReplayOne` continues to deliver to push subscribers regardless of age. The SSE side of manual replay (which uses `Notifier.Publish`) keeps working for already-connected subscribers because live tail is unfiltered. Edge case: if a fresh SSE subscriber happens to be inside its initial-backfill drain at the moment a stale event is manually replayed, that subscriber will skip the event during backfill (initial backfill filters). This is consistent with the design — initial backfill is the catch-up path the design suppresses — and recovers via reconnect (live-tail) plus the relay's other replay surfaces. -- **`hooksctl forward`.** No client change. The server change is sufficient. + +- **Manual inspector replay** (`POST /events/{source}/{sequence}/replay`). Untouched. `Push.ReplayOne` continues to deliver to push subscribers regardless of age. The SSE side of manual replay relies on `Notifier.Publish(source, seq)` waking any current subscriber's live drain, which then `ReadSince(cursor, ...)`. That delivers the replayed event only when `subscriber.cursor < replayed.seq` — i.e. the subscriber hadn't seen it yet. If `subscriber.cursor >= replayed.seq` (the common case after a successful initial backfill), `ReadSince` returns nothing and the replay is a no-op for that SSE subscriber. This is existing behavior, not a regression. Edge case: a fresh SSE subscriber whose initial backfill is in flight when a stale event is manually replayed will skip it during backfill, since initial backfill filters. Operator recovers via reconnect (live tail) or `hooksctl replay`. + +- **`hooksctl forward`.** No client change. The server change is sufficient. No new CLI flags, no parsed log messages, no new wire fields. + +- **`?since=latest`.** Cursor jumps to `LatestSequence(source)`; initial drain reads zero events; the filter never fires. No behavior change. + - **Ingest-time skew check.** Existing behavior in `internal/sources/render.go` is correct and unchanged. + - **Retention / pruning.** Stale events live in the store under each source's configured retention; only their automatic redelivery is suppressed. ## Architecture @@ -64,70 +77,103 @@ type Handler struct { } ``` -It needs the per-source `SkewWindow` to apply the filter. Replace with: +It needs the per-source `effective_skew` to apply the filter. Replace with: ```go type Handler struct { - Sources map[string]time.Duration // skew window per allowed source; zero = source not allowed + // Allowed sources mapped to the effective skew window for that source + // (= configured SkewWindow, or verifier default if zero/unset). Source + // membership is determined by key presence (`d, ok := h.Sources[s]`), + // not by value — the value is a real duration that may legitimately + // be any non-negative number including zero (zero treated as "use + // verifier default" upstream of this map; the map only ever sees the + // resolved effective value). + Sources map[string]time.Duration + Now func() time.Time // injected for tests; defaults to time.Now // ... } ``` -`internal/server.Build` constructs the handler and already has the per-source config in scope (it reads `hooks.yaml` into a `config.Config` whose `Source` entries carry `SkewWindow`). Wiring change is local to `Build`. +`internal/server.Build` constructs the handler and reads `hooks.yaml` into a `config.Config` whose `Source` entries carry `SkewWindow`. Build is responsible for resolving zero/unset to the verifier default before populating the map; `subscribe.Handler` itself never sees zero. (Resolving at the seam keeps the verifier-default constant in `internal/sources` where it already lives.) `subscribe.New`'s signature changes from `sources []string` to `sources map[string]time.Duration`. The package is internal and the production caller is a single line in `internal/server.Build`; tests are updated alongside the change. No backward-compat shim. -### Filter location +### Filter location: split into `initialDrain` + `liveDrain` -The age filter lives in `subscribe.Handler.drain`, gated by a parameter that says "this is the initial backfill" vs "this is a live drain." Implementation sketch: +The current single `drain` becomes two functions. The type system, not a comment, enforces "live tail does not filter." ```go -// Initial backfill: filter by age. -cursor, err := h.drain(ctx, w, flusher, source, cursor, batchLimit, h.Sources[source]) // pass skew -// Live drains: no filter. -cursor, err := h.drain(ctx, w, flusher, source, cursor, batchLimit, 0) // 0 = no filter +// stream(): +cursor, err := h.initialDrain(ctx, w, flusher, source, cursor) // filters by age +... +for { + select { + case <-ctx.Done(): ... + case <-ch: + cursor, err = h.liveDrain(ctx, w, flusher, source, cursor) // unfiltered + case <-ticker.C: + if _, err := io.WriteString(w, ": keepalive\n\n"); err != nil { ... } + flusher.Flush() + cursor, err = h.liveDrain(ctx, w, flusher, source, cursor) // unfiltered + } +} ``` -Inside `drain`, when `maxAge > 0`, events older than `now - maxAge` are skipped and the cursor is advanced past them; otherwise emit unconditionally (current behavior). +Both call a shared inner `readBatchAndEmit(...)` helper that does the SQL + SSE write loop; only `initialDrain` consults `h.Sources[source]` and `h.Now()` to compute `cutoff = h.Now().Add(-effectiveSkew)` per drain pass and skip events whose `provider_timestamp.Before(cutoff)`. `cutoff` is computed once per drain pass, not per event, so a single drain doesn't shift its decision midstream. -The `now` source is injected for testability — Handler gains a `Now func() time.Time` field defaulting to `time.Now`, mirroring the pattern already used in `internal/sources/render.go`. +When `initialDrain` skips an event, it advances `cursor = ev.Sequence` *without* writing to the wire. This is the load-bearing invariant: if cursor isn't advanced, subsequent live drains call `ReadSince(oldCursor)` and re-emit the skipped events on the *unfiltered* path. ## TDD outline -Tests live in `internal/subscribe/handler_test.go` alongside the existing suite. +Tests live in `internal/subscribe/handler_test.go` alongside the existing suite. `Handler.Now` is the clock-control seam. + +1. **Initial backfill skips a stale event.** Seed one event with `provider_timestamp = now - 10m`, source effective skew = 5m. Connect; read SSE stream until live transition (e.g. observe a keepalive comment, or assert no payload after a short read). Assert no SSE message was emitted for the stale seq. -1. **Initial backfill skips a stale event.** Seed one event with `provider_timestamp = now - 10m`, source `SkewWindow = 5m`. Connect; read SSE stream until live transition. Assert no SSE message was emitted for the stale seq. 2. **Initial backfill delivers a fresh event.** Same shape, `provider_timestamp = now - 1m`. Assert the event is emitted. -3. **Mixed batch, cursor advances past skipped events.** Seed `[stale, fresh, stale, fresh]`. Assert only the two fresh events are emitted. Reconnect with `?since=0`; assert the same two fresh events emit and stale ones remain suppressed (idempotent). -4. **Live tail does not filter.** Connect, finish initial backfill, then ingest a stale-timestamped event directly into the store and call `Notifier.Publish(source, seq)`. Assert it is emitted via the live drain. (Models the manual-replay-of-stale-event-to-live-SSE-subscriber path.) -5. **Boundary case at exactly `SkewWindow`.** Event with `provider_timestamp = now - SkewWindow` (delta == skew): emit. Matches `delta > skew` in `render.go:102`. -6. **Skip is observable.** Capture the handler's logger; assert one debug-level entry per skipped event with `source`, `seq`, `delivery_id`, `age`, `skew_window` keys. Plaintext body / signature do not appear. -7. **Default when source is missing from the map.** Direct construction with an unknown source returns 404 (existing behavior). Reasserted to guard the map shape change. -Test injection points: +3. **Mixed batch, only fresh emitted; idempotent on reconnect.** Seed `[stale, fresh, stale, fresh]`. Connect with `?since=0`; assert only the two fresh events are emitted. Reconnect with `?since=0`; assert the same two fresh events emit and the stale ones remain suppressed. + +4. **All-stale batch still advances cursor (regression guard for the load-bearing invariant).** Seed `[stale, stale]`. Connect; finish initial backfill; while still connected, ingest one fresh event and `Notifier.Publish(source, freshSeq)`. Assert the live drain emits **only** the fresh event — not the two stale ones. (If `initialDrain` failed to advance cursor on skip, `liveDrain`'s `ReadSince(0)` would re-pick the stale events on the unfiltered path.) + +5. **Live tail does not filter.** Connect, finish initial backfill, then write a stale-`provider_timestamp` event directly to the store and `Notifier.Publish(source, seq)`. Assert it is emitted via the live drain. (Models manual-replay-of-stale-event-to-live-SSE-subscriber.) + +6. **Boundary at exactly `effective_skew`.** Event with `provider_timestamp = now - effective_skew` (delta == skew): emit. Matches `delta > skew` at `internal/sources/render.go:102`. + +7. **Future-timestamp event passes.** `provider_timestamp = now + 1m`. Emit. Documents the one-sided filter. + +8. **Zero `provider_timestamp` passes.** Insert event with `time.Time{}`. Emit. Forward-compat guard. + +9. **Skip is observable.** Capture the handler's logger (use a `slog.Handler` test double); seed one stale event; assert one debug-level entry with attrs `source` (string), `seq` (int64), `delivery_id` (string), `age` (duration), `skew_window` (duration). Plaintext body / signature do not appear. + +10. **Unknown source still 404s after map shape change.** Direct construction with a `map[string]time.Duration{"render": 5m}`; request `/subscribe/stripe`. 404. (Renames the previous "default when missing" case and exercises the key-presence-not-value-zero membership rule.) -- `Handler.Now` for clock control (defaults to `time.Now`). -- `Handler.Sources` carrying per-source `SkewWindow` values. -- Existing `httptest`-based handler harness reused. +Test fixtures: existing `setup()` helper updated to construct `Handler.Sources = map[string]time.Duration{"render": 5 * time.Minute}` so all pre-existing tests remain valid (events stamped with `time.Now()` are always within 5m by construction). ## Out of scope - A query-parameter override (`?max_age=...`) for client-side tuning. The server policy is uniform. - Push-side mirror behavior. Push delivery already cursors per-subscription independently and has different replay semantics; revisiting it is its own design. - A new YAML field, env var, or CLI flag. The change reuses `SkewWindow`. -- Notifying the consumer that events were skipped (e.g. an SSE comment with the skipped count). Operators have the inspector and `hooksctl replay`; the SSE protocol stays minimal. +- Notifying the consumer that events were skipped (SSE comment, response header, `hooksctl forward` log line). If demand emerges, the SSE comment surface is the lowest-impact follow-up; not designed here. +- Audit emission for skipped events. `/audit` is operator-action volume, not webhook volume. ## Risks and mitigations | Risk | Mitigation | |------|------------| -| Operator surprise — events present in the store but never replayed. | Debug log on every skip; `/audit` and inspector remain authoritative views; CLAUDE.md and `docs/quickstart.md` updated to describe the new behavior. (Doc updates are scoped into the implementation plan, not this design.) | -| Source with `SkewWindow = 0` (operator opted out of skew enforcement at ingest). | Treat `0` as "no filter" on the backfill side too. Consistent with the ingest-side meaning of zero. | +| Operator surprise — events present in the store but never replayed. | Debug log on every skip (per spec, observable via `--dev` or `HOOKS_LOG_LEVEL=debug`). Inspector and `hooksctl replay` remain authoritative recovery surfaces. README, `docs/quickstart.md`, and CLAUDE.md updated to describe the new policy (see "Done when"). | +| Cursor not advanced past skipped events → live drain re-emits stale events on the unfiltered path. | TDD case 4 (all-stale batch + live emit) exercises this directly. Implementation contract: every skip in `initialDrain` updates `cursor`. | | Test clock divergence — `time.Now` vs injected `Now`. | Single source of truth via `Handler.Now`; production wiring leaves it nil and falls back to `time.Now`. | +| Operator sets `skew_window: "0s"` expecting no enforcement, gets 5m default everywhere. | Pre-existing ingest behavior; this spec preserves it consistently rather than introducing a second convention. Documented in the Threshold table row. | ## Done when -- All seven tests above pass. +- All ten TDD cases pass. - Existing `internal/subscribe` tests remain green. - `make lint && make test` clean. -- A short note added to `internal/subscribe/handler.go` package doc and `CLAUDE.md` `internal/subscribe` bullet describing the new policy. (Implementation-plan item; not part of this design.) +- Documentation strings updated to reflect that automatic catch-up is bounded by the skew window: + - `README.md:3` — "including replay of anything missed while disconnected" (qualify with the new policy). + - `README.md:107` — "`forward` first replays any events you missed (none on first run), then tails live." + - `docs/quickstart.md:138` — "replays anything missed since the last cursor, then tails live." + - `internal/subscribe/handler.go` package doc — describe `initialDrain` filter behavior. + - `CLAUDE.md` `internal/subscribe` bullet — one sentence on the new policy and where the threshold comes from. From 34982b7bfb6105212f17d88ce11bbe5254c91185 Mon Sep 17 00:00:00 2001 From: Aaron Brethorst Date: Sat, 9 May 2026 23:20:06 -0700 Subject: [PATCH 3/4] feat(subscribe): skip stale events on /subscribe initial backfill Standard Webhooks consumers reject events whose `webhook-timestamp` is older than ~5 minutes. After a long disconnect, /subscribe replay would re-deliver stale events that the consumer would 401-reject on verification. The initial-backfill drain now filters events older than the source's effective skew window (per-source `skew_window` from hooks.yaml or `sources.DefaultSkewWindow` when zero/unset, resolved at the seam in `internal/server.Build`). The cursor advances past skipped events so reconnects with `?since=` start past them and the live drain does not re-emit them. Live tail (notifier-triggered or keepalive-triggered drains) is unfiltered, so manual replay via the inspector still reaches currently-connected subscribers. The Handler.Sources field changes shape from map[string]bool to map[string]time.Duration; subscribe.New takes the same map. Spec: docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md --- CLAUDE.md | 2 +- README.md | 4 +- docs/quickstart.md | 2 +- internal/server/server.go | 10 +- internal/sources/render.go | 3 +- internal/sources/sources.go | 6 + internal/subscribe/handler.go | 121 +++++++++-- internal/subscribe/handler_test.go | 330 ++++++++++++++++++++++++++++- 8 files changed, 450 insertions(+), 28 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index bea3366..afc58fd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -52,7 +52,7 @@ inbound webhook ──► /ingest/ ──► verifier ──► store.Ap - **`internal/pubsub`** — in-process `Notifier` with buffer-1 channels. **Publishers never block. If a subscriber's buffer is full the SIGNAL is dropped, never the event.** Every subscriber must backfill from the store on wake — that invariant is what makes signal loss safe. Do not change the buffer size or add blocking sends without rethinking that contract. -- **`internal/subscribe`** — SSE handler at `GET /subscribe/{source}`. Replays from `?since=` then tails via the notifier. +- **`internal/subscribe`** — SSE handler at `GET /subscribe/{source}`. Replays from `?since=` then tails via the notifier. Initial backfill filters out events older than the source's effective skew window (per-source `skew_window` in `hooks.yaml` or `sources.DefaultSkewWindow` — 5m — when zero/unset; resolved at the seam in `internal/server.Build`); the cursor still advances past skipped events so they aren't reconsidered on reconnect or via the unfiltered live drain. Live tail (notifier-triggered or keepalive-triggered drains) is unfiltered, so manual replay via the inspector still reaches currently-connected subscribers. - **`internal/push`** — `Manager` runs one worker goroutine per non-paused subscription. Workers POST events one at a time, advancing cursor only on 2xx. Backoff is `min(60s, 2^failures*100ms)` with full jitter. Outbound delivery signature: `X-Hooks-Signature: t=,v1=` where `v1 = HMAC-SHA256(secret, ".")` (see `signing.go`). **The plaintext signing secret only lives in memory** — after a restart, push delivery for each subscription is paused until `hooksctl push rotate-secret ` re-arms it. This is a deliberate trade-off (don't try to "fix" by persisting plaintext). diff --git a/README.md b/README.md index 7c7890d..15943ec 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ ## hooks -A small, self-hosted relay that durably captures inbound webhooks (Render to start), verifies their signatures, and re-delivers them to one or more developer environments — either pulled over Server-Sent Events or pushed to a registered URL — including replay of anything missed while disconnected. +A small, self-hosted relay that durably captures inbound webhooks (Render to start), verifies their signatures, and re-delivers them to one or more developer environments — either pulled over Server-Sent Events or pushed to a registered URL — including replay of anything missed while disconnected, bounded by the source's signature-verification skew window so consumers don't reject stale catch-up traffic. (Older events stay in the store and remain available via the inspector's "Replay to listeners" action and `hooksctl replay`.) To get started: `hooks init`. @@ -104,7 +104,7 @@ In a third terminal, point `hooksctl forward` at whichever local service you're ./bin/hooksctl forward render --to http://localhost:3000/webhooks/render ``` -`forward` first replays any events you missed (none on first run), then tails live. Bytes hitting your local app are byte-for-byte identical to what Render sent — original headers preserved. +`forward` first replays any events you missed (none on first run), then tails live. Replay is bounded by the source's signature-verification skew window (5 minutes for Render by default), so events older than that are skipped during the initial catch-up and your local app won't 401 on a stale `webhook-timestamp`. Older events remain in the store and can be redelivered manually via the inspector or `hooksctl replay`. Bytes hitting your local app are byte-for-byte identical to what Render sent — original headers preserved. ### 7. Trigger a webhook from Render diff --git a/docs/quickstart.md b/docs/quickstart.md index 35340d2..cf6a049 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -135,7 +135,7 @@ hooksctl whoami hooksctl forward render --to http://localhost:3000/webhooks/render ``` -Against a logged-in profile, `forward` auto-mints an ephemeral `kind='listener'` token, replays anything missed since the last cursor, then tails live. The token is revoked on clean exit; the server's prune loop reaps any ephemeral token whose `last_used_at` falls 24h behind. Bytes hitting your local app are byte-for-byte identical to what Render sent. Original headers (other than hop-by-hop) are preserved. +Against a logged-in profile, `forward` auto-mints an ephemeral `kind='listener'` token, replays anything missed since the last cursor, then tails live. Initial catch-up is bounded by the source's signature-verification skew window (5 minutes for Render by default): events older than that are skipped on the initial drain so your verifying consumer doesn't 401 on a stale `webhook-timestamp`. The cursor still advances past skipped events, so reconnects don't reconsider them; the events remain in the store and can be redelivered via the inspector or `hooksctl replay`. The token is revoked on clean exit; the server's prune loop reaps any ephemeral token whose `last_used_at` falls 24h behind. Bytes hitting your local app are byte-for-byte identical to what Render sent. Original headers (other than hop-by-hop) are preserved. For a long-lived listener (skip the mint/revoke dance every run), see [`docs/accounts.md`](accounts.md#power-user-long-lived-listener-token). diff --git a/internal/server/server.go b/internal/server/server.go index 3760e30..2a8ec84 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -73,6 +73,7 @@ func Build(cfg *config.Config, registry *sources.Registry, logger *slog.Logger) bindings := map[string]ingest.SourceBinding{} configuredSources := make([]string, 0, len(cfg.Sources)) configuredSourceSet := map[string]bool{} + subscribeSkews := make(map[string]time.Duration, len(cfg.Sources)) retentions := map[string]time.Duration{} for name, src := range cfg.Sources { v, ok := registry.Build(src.Verifier, src.Secret.Reveal(), sources.Options{ @@ -89,6 +90,13 @@ func Build(cfg *config.Config, registry *sources.Registry, logger *slog.Logger) } configuredSources = append(configuredSources, name) configuredSourceSet[name] = true + // Resolve effective skew at the seam: zero/unset becomes the verifier + // default. The SSE handler relies on a non-zero value here. + effectiveSkew := src.SkewWindow + if effectiveSkew == 0 { + effectiveSkew = sources.DefaultSkewWindow + } + subscribeSkews[name] = effectiveSkew retentions[name] = src.Retention } @@ -154,7 +162,7 @@ func Build(cfg *config.Config, registry *sources.Registry, logger *slog.Logger) ingestHandler.Register(mux, "/ingest/") // Subscribe (SSE). - sseHandler := subscribe.New(st, notifier, bearerAuth, configuredSources, logger) + sseHandler := subscribe.New(st, notifier, bearerAuth, subscribeSkews, logger) mux.Handle("GET /subscribe/{source}", sseHandler) // Token API (admin). diff --git a/internal/sources/render.go b/internal/sources/render.go index 7208563..1a19127 100644 --- a/internal/sources/render.go +++ b/internal/sources/render.go @@ -21,14 +21,13 @@ import ( // The signing secret is accepted in either form: // - "whsec_" (canonical Standard Webhooks form, what Render shows) // - bare bytes (handed straight to HMAC; useful for tests) -const renderDefaultSkew = 5 * time.Minute func init() { Default.Register("render", newRenderVerifier) } func newRenderVerifier(secret string, opts Options) Verifier { skew := opts.SkewWindow if skew == 0 { - skew = renderDefaultSkew + skew = DefaultSkewWindow } now := opts.Now if now == nil { diff --git a/internal/sources/sources.go b/internal/sources/sources.go index cf47ae6..d8b7864 100644 --- a/internal/sources/sources.go +++ b/internal/sources/sources.go @@ -61,6 +61,12 @@ func (o Options) NowOrDefault() time.Time { return time.Now() } +// DefaultSkewWindow is the verifier-default tolerance applied when a source +// does not configure its own skew_window. Verifier factories fall back to +// this when Options.SkewWindow is zero, and the SSE initial-backfill filter +// resolves the same value at the seam in internal/server. +const DefaultSkewWindow = 5 * time.Minute + // ErrInvalidSignature is returned when the HMAC check fails. var ErrInvalidSignature = errors.New("verify: invalid signature") diff --git a/internal/subscribe/handler.go b/internal/subscribe/handler.go index bda2bb9..b59c9a7 100644 --- a/internal/subscribe/handler.go +++ b/internal/subscribe/handler.go @@ -10,11 +10,21 @@ // data is a single line of JSON containing delivery_id, provider_timestamp, // received_at, headers, and a base64-encoded body. Bytes round-trip verbatim. // -// The handler runs a replay loop (read from the store in batches of ≤1000 -// until caught up to the current latest sequence) followed by a live loop -// (select on the per-source notifier channel + a 30s keepalive ticker). On -// any signal the live loop drains all newer events from the store, so a -// dropped notify channel signal still lands the affected events. +// The handler runs an initial-backfill drain (read from the store in batches +// of ≤1000 until caught up to the current latest sequence) followed by a +// live loop (select on the per-source notifier channel + a 30s keepalive +// ticker). On any signal the live loop drains all newer events from the +// store, so a dropped notify channel signal still lands the affected events. +// +// Initial-backfill stale-event filter: events whose provider_timestamp is +// older than the source's effective skew window (the per-source value from +// hooks.yaml, falling back to the verifier's 5-minute default — the same +// effective_skew ingest already enforces) are skipped during the initial +// drain. The cursor advances past skipped events so reconnects with +// `?since=` start past them and the live drain does not re-emit them. +// Live tail (notifier-triggered or keepalive-triggered drains) is +// unfiltered. The trade-off is documented in +// docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md. package subscribe import ( @@ -42,21 +52,35 @@ const ( ) // Handler serves /subscribe/. +// +// Sources maps each allowed source name to its effective skew window — the +// per-source value from hooks.yaml, or the verifier default if zero/unset. +// Source membership is determined by key presence (`d, ok := h.Sources[s]`), +// not value: zero is a legitimate duration and must not be conflated with +// "unknown source". Resolution to a non-zero default happens upstream in +// internal/server.Build so the handler itself never sees zero. type Handler struct { Store store.EventStore Notifier *pubsub.Notifier Auth *tokens.Authenticator - Sources map[string]bool + Sources map[string]time.Duration Logger *slog.Logger Keepalive time.Duration BatchLimit int + + // Now is the clock used by the initial-backfill stale-event filter. + // Tests inject a fixed clock; production leaves it nil and falls back + // to time.Now. + Now func() time.Time } -// New constructs a Handler with sensible defaults. -func New(st store.EventStore, n *pubsub.Notifier, auth *tokens.Authenticator, sources []string, logger *slog.Logger) *Handler { - srcSet := map[string]bool{} - for _, s := range sources { - srcSet[s] = true +// New constructs a Handler with sensible defaults. sources maps each allowed +// source name to its already-resolved effective skew window (caller must not +// pass zero — see Handler.Sources). +func New(st store.EventStore, n *pubsub.Notifier, auth *tokens.Authenticator, sources map[string]time.Duration, logger *slog.Logger) *Handler { + srcSet := make(map[string]time.Duration, len(sources)) + for s, d := range sources { + srcSet[s] = d } if logger == nil { logger = slog.Default() @@ -79,7 +103,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } source := lastPathSegment(r.URL.Path) - if !h.Sources[source] { + if _, ok := h.Sources[source]; !ok { http.Error(w, "unknown source", http.StatusNotFound) return } @@ -129,8 +153,8 @@ func (h *Handler) stream(ctx context.Context, w io.Writer, flusher http.Flusher, ch := h.Notifier.Subscribe(source) defer h.Notifier.Unsubscribe(source, ch) - // Replay phase. - cursor, err := h.drain(ctx, w, flusher, source, cursor, batchLimit) + // Initial backfill — filters stale events by age. See package doc. + cursor, err := h.initialDrain(ctx, w, flusher, source, cursor, batchLimit) if err != nil { return err } @@ -146,7 +170,7 @@ func (h *Handler) stream(ctx context.Context, w io.Writer, flusher http.Flusher, if !ok { return errors.New("notifier closed") } - cursor, err = h.drain(ctx, w, flusher, source, cursor, batchLimit) + cursor, err = h.liveDrain(ctx, w, flusher, source, cursor, batchLimit) if err != nil { return err } @@ -156,7 +180,7 @@ func (h *Handler) stream(ctx context.Context, w io.Writer, flusher http.Flusher, } flusher.Flush() // Belt-and-suspenders: if we missed a signal, drain anyway. - cursor, err = h.drain(ctx, w, flusher, source, cursor, batchLimit) + cursor, err = h.liveDrain(ctx, w, flusher, source, cursor, batchLimit) if err != nil { return err } @@ -164,7 +188,52 @@ func (h *Handler) stream(ctx context.Context, w io.Writer, flusher http.Flusher, } } -func (h *Handler) drain(ctx context.Context, w io.Writer, flusher http.Flusher, source string, cursor int64, batchLimit int) (int64, error) { +// initialDrain reads from the store and emits events to the wire, filtering +// out events older than the source's effective skew window. Cursor advances +// on every event whether emitted or skipped: if it didn't, the unfiltered +// liveDrain that runs next would re-pick the same skipped events from the +// store and re-emit them on the wire. +func (h *Handler) initialDrain(ctx context.Context, w io.Writer, flusher http.Flusher, source string, cursor int64, batchLimit int) (int64, error) { + skew := h.Sources[source] + now := h.now() + cutoff := now.Add(-skew) + return h.readBatchAndEmit(ctx, w, flusher, source, cursor, batchLimit, func(ev store.Event) bool { + // Forward-compat: a missing/zero ProviderTimestamp passes. A + // raw time.Time{} round-trips through the store's nanosecond + // encoding into a pre-epoch wraparound value (year 1754), so + // IsZero() doesn't catch it. The "at or before unix epoch" + // sentinel covers both that wraparound and any pre-1970 + // garbage; real provider timestamps are post-2000. + if ev.ProviderTimestamp.Unix() <= 0 { + return true + } + if ev.ProviderTimestamp.Before(cutoff) { + h.Logger.Debug("subscribe: skipping stale event on initial backfill", + slog.String("source", source), + slog.Int64("seq", ev.Sequence), + slog.String("delivery_id", ev.DeliveryID), + slog.Duration("age", now.Sub(ev.ProviderTimestamp)), + slog.Duration("skew_window", skew), + ) + return false + } + return true + }) +} + +// liveDrain reads from the store and emits every event unconditionally. +// Live ingest events are fresh by definition (they just passed the same +// effective_skew check at ingest), and the inspector "Replay to listeners" +// path uses Notifier.Publish to wake currently-connected SSE subscribers — +// that path stays open even for stale events. +func (h *Handler) liveDrain(ctx context.Context, w io.Writer, flusher http.Flusher, source string, cursor int64, batchLimit int) (int64, error) { + return h.readBatchAndEmit(ctx, w, flusher, source, cursor, batchLimit, nil) +} + +// readBatchAndEmit reads ≤batchLimit events at a time starting after cursor +// and emits each to the wire. If keep is non-nil, an event is written only +// when keep(ev) is true; cursor advances on every event regardless. +func (h *Handler) readBatchAndEmit(ctx context.Context, w io.Writer, flusher http.Flusher, source string, cursor int64, batchLimit int, keep func(store.Event) bool) (int64, error) { for { batch, err := h.Store.ReadSince(ctx, source, cursor, batchLimit) if err != nil { @@ -173,16 +242,30 @@ func (h *Handler) drain(ctx context.Context, w io.Writer, flusher http.Flusher, if len(batch) == 0 { return cursor, nil } + var wrote bool for _, ev := range batch { + cursor = ev.Sequence + if keep != nil && !keep(ev) { + continue + } if err := writeEvent(w, ev); err != nil { return cursor, err } - cursor = ev.Sequence + wrote = true + } + if wrote { + flusher.Flush() } - flusher.Flush() } } +func (h *Handler) now() time.Time { + if h.Now != nil { + return h.Now() + } + return time.Now() +} + // writeEvent renders an Event as a single SSE message. func writeEvent(w io.Writer, ev store.Event) error { payload := ssePayload{ diff --git a/internal/subscribe/handler_test.go b/internal/subscribe/handler_test.go index da50b2e..cd0a97f 100644 --- a/internal/subscribe/handler_test.go +++ b/internal/subscribe/handler_test.go @@ -2,6 +2,7 @@ package subscribe import ( "bufio" + "bytes" "context" "encoding/base64" "encoding/json" @@ -43,17 +44,26 @@ func setup(t *testing.T) (*Handler, *store.SQLite, *pubsub.Notifier, string, str } notifier := pubsub.New() - h := New(st, notifier, tokens.New(st.Tokens()), []string{"render"}, slog.New(slog.DiscardHandler)) + h := New(st, notifier, tokens.New(st.Tokens()), map[string]time.Duration{"render": 5 * time.Minute}, slog.New(slog.DiscardHandler)) h.Keepalive = 50 * time.Millisecond return h, st, notifier, res.Plaintext, adminRes.Plaintext } func appendEvent(t *testing.T, st *store.SQLite, source, deliveryID string, body []byte) store.Event { + t.Helper() + return appendEventAt(t, st, source, deliveryID, time.Now(), body) +} + +// appendEventAt seeds an event with a caller-controlled ProviderTimestamp. +// The store still stamps ReceivedAt with time.Now(); the SSE stale filter +// reads ProviderTimestamp, which is what the consumer's signature-verifier +// will check. +func appendEventAt(t *testing.T, st *store.SQLite, source, deliveryID string, providerTime time.Time, body []byte) store.Event { t.Helper() ev, err := st.Append(context.Background(), store.AppendInput{ Source: source, DeliveryID: deliveryID, - ProviderTimestamp: time.Now(), + ProviderTimestamp: providerTime, Headers: map[string]string{"X-Test": "1"}, Body: body, }) @@ -390,3 +400,319 @@ func TestUnknownSourceIs404(t *testing.T) { t.Fatalf("status %d", resp.StatusCode) } } + +// readWithDeadline pulls the next SSE message off the stream or returns +// ok=false on timeout. Used for assertions that nothing arrived. +func readWithDeadline(stream <-chan sseMessage, d time.Duration) (sseMessage, bool) { + select { + case msg, ok := <-stream: + if !ok { + return sseMessage{}, false + } + return msg, true + case <-time.After(d): + return sseMessage{}, false + } +} + +// connect opens a SSE subscription using tok and returns the response and +// the parsed message channel. Caller is responsible for cancel + Close. +func connect(t *testing.T, srv *httptest.Server, tok, path string) (*http.Response, <-chan sseMessage, context.CancelFunc) { + t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, srv.URL+path, nil) + req.Header.Set("Authorization", "Bearer "+tok) + resp, err := http.DefaultClient.Do(req) + if err != nil { + cancel() + t.Fatal(err) + } + if resp.StatusCode != http.StatusOK { + cancel() + _ = resp.Body.Close() + t.Fatalf("status %d", resp.StatusCode) + } + stream := readSSE(t, resp.Body) + return resp, stream, cancel +} + +// fixedNow returns a clock function that always returns t. Used to make the +// initial-backfill stale filter deterministic in tests. +func fixedNow(t time.Time) func() time.Time { return func() time.Time { return t } } + +func TestInitialBackfillSkipsStaleEvent(t *testing.T) { + h, st, _, tok, _ := setup(t) + now := time.Date(2026, 5, 9, 12, 0, 0, 0, time.UTC) + h.Now = fixedNow(now) + srv := startServer(t, h) + + appendEventAt(t, st, "render", "stale-1", now.Add(-10*time.Minute), []byte("body")) + + resp, stream, cancel := connect(t, srv, tok, "/subscribe/render?since=0") + defer cancel() + defer resp.Body.Close() + + if msg, ok := readWithDeadline(stream, 750*time.Millisecond); ok && msg.ID != "" { + t.Fatalf("expected no SSE message; got id=%q event=%q", msg.ID, msg.Event) + } +} + +func TestInitialBackfillDeliversFreshEvent(t *testing.T) { + h, st, _, tok, _ := setup(t) + now := time.Date(2026, 5, 9, 12, 0, 0, 0, time.UTC) + h.Now = fixedNow(now) + srv := startServer(t, h) + + appendEventAt(t, st, "render", "fresh-1", now.Add(-1*time.Minute), []byte("body")) + + resp, stream, cancel := connect(t, srv, tok, "/subscribe/render?since=0") + defer cancel() + defer resp.Body.Close() + + msg, ok := readWithDeadline(stream, 2*time.Second) + if !ok { + t.Fatal("never received fresh event") + } + if msg.ID != "1" { + t.Fatalf("got id %q, want 1", msg.ID) + } +} + +func TestInitialBackfillMixedBatchOnlyFreshEmittedAndIdempotentOnReconnect(t *testing.T) { + h, st, _, tok, _ := setup(t) + now := time.Date(2026, 5, 9, 12, 0, 0, 0, time.UTC) + h.Now = fixedNow(now) + srv := startServer(t, h) + + stale := now.Add(-10 * time.Minute) + fresh := now.Add(-1 * time.Minute) + appendEventAt(t, st, "render", "stale-a", stale, []byte("a")) // seq 1 + appendEventAt(t, st, "render", "fresh-b", fresh, []byte("b")) // seq 2 + appendEventAt(t, st, "render", "stale-c", stale, []byte("c")) // seq 3 + appendEventAt(t, st, "render", "fresh-d", fresh, []byte("d")) // seq 4 + + collect := func() []string { + resp, stream, cancel := connect(t, srv, tok, "/subscribe/render?since=0") + defer cancel() + defer resp.Body.Close() + var ids []string + // Read until the stream goes idle (no more events). The window + // must be long enough to survive -race slowdowns on CI runners. + for { + msg, ok := readWithDeadline(stream, 750*time.Millisecond) + if !ok { + return ids + } + ids = append(ids, msg.ID) + } + } + + first := collect() + if len(first) != 2 || first[0] != "2" || first[1] != "4" { + t.Fatalf("first connect: got ids %v, want [2 4]", first) + } + second := collect() + if len(second) != 2 || second[0] != "2" || second[1] != "4" { + t.Fatalf("reconnect: got ids %v, want [2 4]", second) + } +} + +func TestInitialBackfillAllStaleStillAdvancesCursor(t *testing.T) { + h, st, notifier, tok, _ := setup(t) + now := time.Date(2026, 5, 9, 12, 0, 0, 0, time.UTC) + h.Now = fixedNow(now) + srv := startServer(t, h) + + stale := now.Add(-10 * time.Minute) + appendEventAt(t, st, "render", "stale-1", stale, []byte("a")) // seq 1 + appendEventAt(t, st, "render", "stale-2", stale, []byte("b")) // seq 2 + + resp, stream, cancel := connect(t, srv, tok, "/subscribe/render?since=0") + defer cancel() + defer resp.Body.Close() + + // Drain the initial backfill: we expect zero messages within a short window. + if msg, ok := readWithDeadline(stream, 750*time.Millisecond); ok && msg.ID != "" { + t.Fatalf("initial drain emitted unexpected msg id=%q", msg.ID) + } + + // Now ingest a fresh event live and notify. + freshEv := appendEventAt(t, st, "render", "fresh-3", now.Add(-1*time.Minute), []byte("c")) + notifier.Publish("render", freshEv.Sequence) + + msg, ok := readWithDeadline(stream, 2*time.Second) + if !ok { + t.Fatal("never received fresh event") + } + if msg.ID != "3" { + t.Fatalf("got id %q, want 3 — cursor was not advanced past stale events on initial drain", msg.ID) + } + // Make sure no further (re-emitted stale) event sneaks in. + if extra, ok := readWithDeadline(stream, 500*time.Millisecond); ok && extra.ID != "" { + t.Fatalf("unexpected extra msg id=%q after fresh event", extra.ID) + } +} + +func TestLiveTailDoesNotFilter(t *testing.T) { + h, st, notifier, tok, _ := setup(t) + now := time.Date(2026, 5, 9, 12, 0, 0, 0, time.UTC) + h.Now = fixedNow(now) + srv := startServer(t, h) + + resp, stream, cancel := connect(t, srv, tok, "/subscribe/render?since=0") + defer cancel() + defer resp.Body.Close() + + // Wait for initial drain to drain (no events seeded). + if msg, ok := readWithDeadline(stream, 500*time.Millisecond); ok && msg.ID != "" { + t.Fatalf("unexpected initial msg id=%q", msg.ID) + } + + // Now write a stale event directly to the store and notify. Live tail + // must not filter — this models manual replay of an old event to a + // currently-connected SSE subscriber. + staleEv := appendEventAt(t, st, "render", "stale-live", now.Add(-30*time.Minute), []byte("body")) + notifier.Publish("render", staleEv.Sequence) + + msg, ok := readWithDeadline(stream, 2*time.Second) + if !ok { + t.Fatal("live tail did not deliver stale event") + } + if msg.ID != "1" { + t.Fatalf("got id %q, want 1", msg.ID) + } +} + +func TestInitialBackfillBoundaryAtExactlySkew(t *testing.T) { + h, st, _, tok, _ := setup(t) + now := time.Date(2026, 5, 9, 12, 0, 0, 0, time.UTC) + h.Now = fixedNow(now) + srv := startServer(t, h) + + // delta == skew (5 min) → emit. Matches `delta > skew` ingest semantics. + appendEventAt(t, st, "render", "boundary-1", now.Add(-5*time.Minute), []byte("body")) + + resp, stream, cancel := connect(t, srv, tok, "/subscribe/render?since=0") + defer cancel() + defer resp.Body.Close() + + msg, ok := readWithDeadline(stream, 2*time.Second) + if !ok { + t.Fatal("event at exactly skew was filtered; expected emit") + } + if msg.ID != "1" { + t.Fatalf("got id %q, want 1", msg.ID) + } +} + +func TestInitialBackfillFutureTimestampPasses(t *testing.T) { + h, st, _, tok, _ := setup(t) + now := time.Date(2026, 5, 9, 12, 0, 0, 0, time.UTC) + h.Now = fixedNow(now) + srv := startServer(t, h) + + appendEventAt(t, st, "render", "future-1", now.Add(1*time.Minute), []byte("body")) + + resp, stream, cancel := connect(t, srv, tok, "/subscribe/render?since=0") + defer cancel() + defer resp.Body.Close() + + msg, ok := readWithDeadline(stream, 2*time.Second) + if !ok { + t.Fatal("future-timestamp event was not emitted") + } + if msg.ID != "1" { + t.Fatalf("got id %q, want 1", msg.ID) + } +} + +func TestInitialBackfillZeroProviderTimestampPasses(t *testing.T) { + h, st, _, tok, _ := setup(t) + now := time.Date(2026, 5, 9, 12, 0, 0, 0, time.UTC) + h.Now = fixedNow(now) + srv := startServer(t, h) + + appendEventAt(t, st, "render", "zero-1", time.Time{}, []byte("body")) + + resp, stream, cancel := connect(t, srv, tok, "/subscribe/render?since=0") + defer cancel() + defer resp.Body.Close() + + msg, ok := readWithDeadline(stream, 2*time.Second) + if !ok { + t.Fatal("zero-timestamp event was filtered; expected emit (forward-compat)") + } + if msg.ID != "1" { + t.Fatalf("got id %q, want 1", msg.ID) + } +} + +// syncBuf is a goroutine-safe bytes.Buffer wrapper. The handler goroutine +// writes log lines while the test goroutine reads them; bytes.Buffer alone +// races under -race. +type syncBuf struct { + mu sync.Mutex + b bytes.Buffer +} + +func (s *syncBuf) Write(p []byte) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.b.Write(p) +} +func (s *syncBuf) String() string { + s.mu.Lock() + defer s.mu.Unlock() + return s.b.String() +} + +func TestInitialBackfillSkipIsObservable(t *testing.T) { + h, st, _, tok, _ := setup(t) + logBuf := &syncBuf{} + h.Logger = slog.New(slog.NewTextHandler(logBuf, &slog.HandlerOptions{Level: slog.LevelDebug})) + now := time.Date(2026, 5, 9, 12, 0, 0, 0, time.UTC) + h.Now = fixedNow(now) + srv := startServer(t, h) + + ev := appendEventAt(t, st, "render", "stale-obs", now.Add(-10*time.Minute), []byte("body")) + + resp, stream, cancel := connect(t, srv, tok, "/subscribe/render?since=0") + defer cancel() + defer resp.Body.Close() + // Drain enough time for the initial drain to run. + _, _ = readWithDeadline(stream, 750*time.Millisecond) + + logged := logBuf.String() + wantSubs := []string{ + `level=DEBUG`, + `source=render`, + fmt.Sprintf("seq=%d", ev.Sequence), + `delivery_id=stale-obs`, + `age=`, + `skew_window=`, + } + for _, sub := range wantSubs { + if !strings.Contains(logged, sub) { + t.Fatalf("log missing %q; got: %s", sub, logged) + } + } +} + +func TestUnknownSourceIs404WithMapShape(t *testing.T) { + // Direct construction with the new map shape — exercises the + // key-presence-not-value-zero membership rule for /subscribe/. + h, _, _, tok, _ := setup(t) + h.Sources = map[string]time.Duration{"render": 5 * time.Minute} + srv := startServer(t, h) + + req, _ := http.NewRequest(http.MethodGet, srv.URL+"/subscribe/stripe", nil) + req.Header.Set("Authorization", "Bearer "+tok) + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNotFound { + t.Fatalf("status %d", resp.StatusCode) + } +} From 077ee2be4830fed7918704e08a2aa05897318ad3 Mon Sep 17 00:00:00 2001 From: Aaron Brethorst Date: Sun, 10 May 2026 08:43:15 -0700 Subject: [PATCH 4/4] docs(subscribe): mark spec as implemented; tag log fence as text Addresses the two CodeRabbit inline comments on the spec doc. The cognitive-complexity nitpick on internal/subscribe/handler.go is skipped: the project's golangci-lint config does not include a complexity linter, the SonarCloud quality gate passed, and extracting a helper for a 24-line function with one for-loop adds indirection without readability gain. --- .../2026-05-09-subscribe-stale-backfill-filter-design.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md b/docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md index b89b38a..ec5b642 100644 --- a/docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md +++ b/docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md @@ -1,6 +1,6 @@ # Skip stale events on `/subscribe` initial backfill -**Status:** approved, pending implementation +**Status:** approved, implemented in PR #8 **Date:** 2026-05-09 **Affects:** `internal/subscribe`, `internal/server` (handler wiring) @@ -12,7 +12,7 @@ Standard Webhooks consumers reject messages whose `webhook-timestamp` is older t The conflict is structural. The relay durably stores webhooks; consumers verify timestamps strictly. As soon as automatic replay crosses the consumer's tolerance, every redelivery 401s. The Rails app behind `hooksctl forward` is exhibiting exactly this: -``` +```text Render webhook: signature verification failed: Message timestamp too old Filter chain halted as :verify_render_signature rendered or redirected Completed 401 Unauthorized