diff --git a/cmd/rater/main.go b/cmd/rater/main.go index 9a155d7..c770edc 100644 --- a/cmd/rater/main.go +++ b/cmd/rater/main.go @@ -20,11 +20,47 @@ // (the E3 ft-uniqueness violation: a uuid4 checkpoint id cannot carry two bases, // so this is broken base_model propagation, never a priceable rollup). Any of // these means revenue is leaking upstream. Distinct code so a CronJob can alert -// on "lost revenue / lost data" separately from "job broke". +// on "lost revenue / lost data" separately from "job broke". A ROUTINE run (the +// default trailing-hours window, no --since/--until) that RECONCILE-DELETED a +// previously-billed rated_usage row ALSO exits 2: on a routine cadence a prior +// bill vanishing means data was lost / an upstream regression dropped events — +// page someone (see exitCode and the reconcile-exit contract below). // // Re-running any window is safe and idempotent (rollups upsert on the natural // key), so cron overlap or manual re-rating never double-counts. // +// THE RECONCILE-DELETE EXIT CONTRACT (option (c): loud on routine, quiet on +// backfill). A run that DELETES a previously-billed rated_usage row +// (ReconciledDeletions > 0) is a REVENUE CHANGE — a customer's prior bill just got +// rewritten. Whether that is alarming depends on WHY the window was chosen: +// - DEFAULT trailing-hours window (no --since/--until): a routine cron run should +// NOT be rewriting prior bills. If it does, events that billed before have +// vanished from billing_event (data loss) or an upstream regression now drops +// them (e.g. base_model stopped propagating → rollups went unpriced/ambiguous). +// That is an ERROR + exit 2 — treat it like the other anomalies and page. +// - EXPLICIT --since/--until window (operator backfill, e.g. after a late price +// fix): convergence is exactly what the operator asked for — INFO + exit 0. +// +// The gate is ReconciledDeletions > 0 && !windowExplicit. The semantics are +// UNCHANGED either way ("what the latest run says is what bills" — store.go always +// reconciles); this is purely the observability/exit contract. The decision lives +// HERE, in cmd/rater, because only here is it known whether the window was explicit; +// it is deliberately NOT folded into Result.HasAnomaly() (that would make explicit +// backfills exit nonzero too). +// +// SINGLE-FLIGHT ASSUMPTION (deployment contract, enforced OUTSIDE this repo). The +// rater MUST run single-flight: no two rater processes rating overlapping windows +// concurrently. The reconcile DELETE (the `deleted` CTE in store.go) and the +// ordered upsert are deadlock-safe ONLY under single-flight — the upsert's +// ORDER BY prevents a rater self-deadlocking against its own rows, but two +// concurrent raters could still take row locks in opposing orders across the +// DELETE/INSERT pair (a classic ABBA hazard). The deployment MUST forbid +// concurrency: the Atlas CronJob sets `concurrencyPolicy: Forbid` so a slow run is +// never overlapped by the next tick. That manifest lives in the Atlas repo, not +// here; this rater does NOT add delete-lock-ordering machinery because single-flight +// makes the hazard unreachable. If you ever run the rater outside that CronJob, +// preserve single-flight some other way (an advisory lock, a queue) before doing so. +// // THE DEFAULT WINDOW IS THE TRAILING N COMPLETE HOURS, [floor(now)-N*1h, // floor(now)), N = rateTrailingHours (default 24). It is deliberately NOT just the // last hour: events can be DRAINED LATE (a Valkey outage recovered from the WAL) @@ -92,6 +128,26 @@ const ( exitAnomaly = 2 // window rated but something leaked: unpriced and/or unattributable ) +// exitCode maps a completed rating run to its process exit code, encoding the +// reconcile-delete contract (see the package doc). A run that rated cleanly but +// leaked an anomaly (unpriced / unattributable / ambiguous-base) ALWAYS exits +// exitAnomaly. A reconcile-delete (reconciledDeletions > 0) exits exitAnomaly TOO — +// but ONLY when the window was the default trailing-hours window (windowExplicit == +// false): on a routine run a prior bill vanishing is alarming (data loss / upstream +// regression). When the operator chose the window explicitly (--since/--until), +// reconcile-deletes are intended convergence (a backfill) and do NOT raise the exit +// code on their own. This is the single place the routine-vs-backfill decision is +// made, because only cmd/rater knows whether the window was explicit. +func exitCode(reconciledDeletions int64, windowExplicit, hasAnomaly bool) int { + if hasAnomaly { + return exitAnomaly + } + if reconciledDeletions > 0 && !windowExplicit { + return exitAnomaly + } + return exitOK +} + func main() { os.Exit(run()) } @@ -133,7 +189,7 @@ func run() int { return exitFatal } - windowStart, windowEnd, err := resolveWindow(*since, *until, opts.trailingHours, time.Now()) + windowStart, windowEnd, windowExplicit, err := resolveWindow(*since, *until, opts.trailingHours, time.Now()) if err != nil { log.Error.Printf("rater: %v", err) return exitFatal @@ -152,21 +208,18 @@ func run() int { defer func() { _ = store.Close() }() r := rating.New(store, book, log) - res, err := r.Run(ctx, windowStart, windowEnd) + res, err := r.Run(ctx, windowStart, windowEnd, windowExplicit) if err != nil { log.Error.Printf("rater: run: %v", err) return exitFatal } - if res.HasAnomaly() { - // The window was rated and rollups were written, but something leaked: - // events that could not be priced, rows that could not be attributed (NULL - // auth_id/model), and/or an ft: rollup spanning multiple base_models (the E3 - // ft-uniqueness violation). Non-zero exit so a CronJob surfaces the lost - // revenue / lost data. - return exitAnomaly - } - return exitOK + // Exit code encodes BOTH the leaked-anomaly signal (unpriced / unattributable / + // ambiguous-base — always nonzero) AND the reconcile-delete contract: a routine + // run (default window) that rewrote a prior bill is alarming and exits nonzero, + // while an explicit backfill (--since/--until) that did so is intended and exits + // 0. See exitCode and the package doc. + return exitCode(res.ReconciledDeletions, windowExplicit, res.HasAnomaly()) } // resolveWindow computes [start, end). Defaults (both flags empty) rate the @@ -180,9 +233,16 @@ func run() int { // late still slips (residual risk; see the package doc — reconciliation is the // backstop). Either flag may be given explicitly (RFC3339) and WINS over the // default; both must parse, be hour-aligned, and start must be before end. -func resolveWindow(since, until string, trailingHours int, now time.Time) (time.Time, time.Time, error) { +// +// The third return is windowExplicit: TRUE if EITHER --since or --until was set, i.e. +// the operator named the window rather than taking the trailing-hours default. It is +// the single source of truth for the routine-vs-backfill distinction the +// reconcile-delete exit contract turns on (see exitCode + the package doc): a +// reconcile-delete on a routine (default) window is alarming; on an explicit backfill +// it is intended convergence. +func resolveWindow(since, until string, trailingHours int, now time.Time) (time.Time, time.Time, bool, error) { if trailingHours < 1 { - return time.Time{}, time.Time{}, errInvalidTrailingHours(trailingHours) + return time.Time{}, time.Time{}, false, errInvalidTrailingHours(trailingHours) } now = now.UTC() currentHour := now.Truncate(time.Hour) @@ -190,22 +250,27 @@ func resolveWindow(since, until string, trailingHours int, now time.Time) (time. start := currentHour.Add(-time.Duration(trailingHours) * time.Hour) end := currentHour + // Either flag set means the operator chose the window — an explicit backfill, not + // the routine trailing-hours cadence. Captured BEFORE parsing so it reflects + // operator INTENT (a flag was passed) independent of the parsed values. + windowExplicit := since != "" || until != "" + if since != "" { t, err := time.Parse(time.RFC3339, since) if err != nil { - return time.Time{}, time.Time{}, errBadWindow("since", since, err) + return time.Time{}, time.Time{}, false, errBadWindow("since", since, err) } start = t.UTC() } if until != "" { t, err := time.Parse(time.RFC3339, until) if err != nil { - return time.Time{}, time.Time{}, errBadWindow("until", until, err) + return time.Time{}, time.Time{}, false, errBadWindow("until", until, err) } end = t.UTC() } if !start.Before(end) { - return time.Time{}, time.Time{}, errInvertedWindow(start, end) + return time.Time{}, time.Time{}, false, errInvertedWindow(start, end) } // Both bounds MUST be hour-aligned. The rollup buckets by date_trunc('hour') // and the upsert REPLACES a bucket's totals (not additive — that is what makes @@ -215,12 +280,12 @@ func resolveWindow(since, until string, trailingHours int, now time.Time) (time. // this fences the explicit --since/--until path. Fail loud rather than snap, // so an operator never silently re-rates hours they did not name. if !start.Truncate(time.Hour).Equal(start) { - return time.Time{}, time.Time{}, errUnalignedWindow("since", start) + return time.Time{}, time.Time{}, false, errUnalignedWindow("since", start) } if !end.Truncate(time.Hour).Equal(end) { - return time.Time{}, time.Time{}, errUnalignedWindow("until", end) + return time.Time{}, time.Time{}, false, errUnalignedWindow("until", end) } - return start, end, nil + return start, end, windowExplicit, nil } // raterOptions are the non-pool knobs loadConfig resolves from the settings file. diff --git a/cmd/rater/main_test.go b/cmd/rater/main_test.go index 375b0ad..f4d21d2 100644 --- a/cmd/rater/main_test.go +++ b/cmd/rater/main_test.go @@ -20,7 +20,7 @@ func mustTime(s string) time.Time { // so the re-rate never doubles). func TestResolveWindow_DefaultTrailingHours(t *testing.T) { now := mustTime("2026-06-08T10:37:42Z") - start, end, err := resolveWindow("", "", defaultRateTrailingHours, now) + start, end, explicit, err := resolveWindow("", "", defaultRateTrailingHours, now) if err != nil { t.Fatal(err) } @@ -30,22 +30,30 @@ func TestResolveWindow_DefaultTrailingHours(t *testing.T) { if !end.Equal(mustTime("2026-06-08T10:00:00Z")) { t.Fatalf("end = %v, want 10:00", end) } + // The default trailing window is NOT explicit — it is the routine cadence, so a + // reconcile-delete on it must page (see the reconcile-exit contract). + if explicit { + t.Fatal("default trailing window must report windowExplicit=false") + } // A custom N widens/narrows the trailing window; N=1 is the old last-hour-only. - start, end, err = resolveWindow("", "", 3, now) + start, end, explicit, err = resolveWindow("", "", 3, now) if err != nil { t.Fatal(err) } if !start.Equal(mustTime("2026-06-08T07:00:00Z")) || !end.Equal(mustTime("2026-06-08T10:00:00Z")) { t.Fatalf("window = [%v,%v), want [07:00,10:00) for N=3", start, end) } + if explicit { + t.Fatal("a custom N is still the trailing default, not an explicit window") + } } // TestResolveWindow_RejectsBadTrailingHours: N < 1 would rate nothing — fail loud. func TestResolveWindow_RejectsBadTrailingHours(t *testing.T) { now := mustTime("2026-06-08T10:37:42Z") for _, n := range []int{0, -1} { - if _, _, err := resolveWindow("", "", n, now); err == nil { + if _, _, _, err := resolveWindow("", "", n, now); err == nil { t.Fatalf("expected error for trailingHours=%d", n) } } @@ -55,19 +63,39 @@ func TestResolveWindow_RejectsBadTrailingHours(t *testing.T) { // trailing default). func TestResolveWindow_ExplicitFlags(t *testing.T) { now := mustTime("2026-06-08T10:37:42Z") - start, end, err := resolveWindow("2026-06-01T00:00:00Z", "2026-06-02T00:00:00Z", defaultRateTrailingHours, now) + start, end, explicit, err := resolveWindow("2026-06-01T00:00:00Z", "2026-06-02T00:00:00Z", defaultRateTrailingHours, now) if err != nil { t.Fatal(err) } if !start.Equal(mustTime("2026-06-01T00:00:00Z")) || !end.Equal(mustTime("2026-06-02T00:00:00Z")) { t.Fatalf("window = [%v,%v), want the 24h day", start, end) } + // Both flags set → an explicit operator backfill, so a reconcile-delete here is + // intended convergence (exit 0), not a page. + if !explicit { + t.Fatal("an explicit --since/--until window must report windowExplicit=true") + } +} + +// TestResolveWindow_SingleFlagIsExplicit: setting EITHER flag alone (not both) still +// marks the window explicit — the operator named a bound, so a reconcile-delete on it +// is an intended backfill, not a routine-run page. +func TestResolveWindow_SingleFlagIsExplicit(t *testing.T) { + now := mustTime("2026-06-08T10:37:42Z") + // --since only (until defaults to floor(now), hour-aligned). + if _, _, explicit, err := resolveWindow("2026-06-01T00:00:00Z", "", defaultRateTrailingHours, now); err != nil || !explicit { + t.Fatalf("--since-only: explicit=%v err=%v, want explicit=true nil", explicit, err) + } + // --until only (since defaults to floor(now)-N*1h, hour-aligned). + if _, _, explicit, err := resolveWindow("", "2026-06-08T09:00:00Z", defaultRateTrailingHours, now); err != nil || !explicit { + t.Fatalf("--until-only: explicit=%v err=%v, want explicit=true nil", explicit, err) + } } // TestResolveWindow_Inverted rejects start >= end. func TestResolveWindow_Inverted(t *testing.T) { now := mustTime("2026-06-08T10:37:42Z") - if _, _, err := resolveWindow("2026-06-02T00:00:00Z", "2026-06-01T00:00:00Z", defaultRateTrailingHours, now); err == nil { + if _, _, _, err := resolveWindow("2026-06-02T00:00:00Z", "2026-06-01T00:00:00Z", defaultRateTrailingHours, now); err == nil { t.Fatal("expected error for inverted window") } } @@ -75,11 +103,63 @@ func TestResolveWindow_Inverted(t *testing.T) { // TestResolveWindow_BadFormat rejects a non-RFC3339 value. func TestResolveWindow_BadFormat(t *testing.T) { now := mustTime("2026-06-08T10:37:42Z") - if _, _, err := resolveWindow("yesterday", "", defaultRateTrailingHours, now); err == nil { + if _, _, _, err := resolveWindow("yesterday", "", defaultRateTrailingHours, now); err == nil { t.Fatal("expected error for unparseable --since") } } +// TestRater_RoutineRunReconcileDeleteExitsNonzero pins the EXIT-CODE half of the +// reconcile observability contract: a ROUTINE run (the default trailing-hours window — +// windowExplicit == false) that DELETED a previously-billed rated_usage row +// (ReconciledDeletions > 0) is a revenue change with no operator behind it, which +// means data was lost / an upstream regression dropped events. It MUST exit nonzero +// (exitAnomaly) so a CronJob pages — even with NO other anomaly. This pins ONLY the +// exit code (exitCode()'s gate); the matching LOG-severity half (routine → ERROR) is +// pinned separately by TestRater_RoutineReconcileDeleteLogsError in internal/rating, +// where the reconcile-delete log line is emitted. RED before FIX 1's exit-code change: +// the old exit path keyed solely on HasAnomaly(), so a reconcile-delete with no other +// anomaly returned exitOK and the page never fired. +func TestRater_RoutineRunReconcileDeleteExitsNonzero(t *testing.T) { + const ( + windowExplicit = false // routine default trailing-hours window + hasAnomaly = false // ONLY a reconcile-delete; no unpriced/unattributable/ambiguous + ) + if got := exitCode(1, windowExplicit, hasAnomaly); got != exitAnomaly { + t.Fatalf("routine run with a reconcile-delete: exit = %d, want exitAnomaly (%d) — a prior bill vanished on a routine cadence; page someone", got, exitAnomaly) + } + // And with MORE than one delete (count is just a signal, not a threshold). + if got := exitCode(7, windowExplicit, hasAnomaly); got != exitAnomaly { + t.Fatalf("routine run with 7 reconcile-deletes: exit = %d, want exitAnomaly (%d)", got, exitAnomaly) + } + // Sanity: a routine run with NO reconcile-delete and no anomaly is the clean path. + if got := exitCode(0, windowExplicit, hasAnomaly); got != exitOK { + t.Fatalf("clean routine run: exit = %d, want exitOK (%d)", got, exitOK) + } +} + +// TestRater_BackfillReconcileDeleteExitsZero pins the quiet half: an EXPLICIT +// operator backfill (--since/--until → windowExplicit == true) that DELETED a +// previously-billed row is intended convergence ("they asked for it", e.g. after a +// late price fix), so it exits 0 (INFO, not a page) when nothing else leaked. A +// genuine anomaly still forces exitAnomaly even on a backfill — the explicit flag +// suppresses ONLY the reconcile-delete signal, never a real anomaly. +func TestRater_BackfillReconcileDeleteExitsZero(t *testing.T) { + const windowExplicit = true // operator named the window + // Reconcile-delete on an explicit backfill, nothing else leaked → exit 0. + if got := exitCode(3, windowExplicit, false); got != exitOK { + t.Fatalf("explicit backfill with a reconcile-delete: exit = %d, want exitOK (%d) — convergence the operator asked for", got, exitOK) + } + // But a real anomaly during a backfill STILL exits nonzero (the flag never + // suppresses unpriced/unattributable/ambiguous). + if got := exitCode(3, windowExplicit, true); got != exitAnomaly { + t.Fatalf("explicit backfill that ALSO leaked an anomaly: exit = %d, want exitAnomaly (%d) — --since must not mask a real anomaly", got, exitAnomaly) + } + // A clean explicit backfill (no deletes, no anomaly) is exit 0. + if got := exitCode(0, windowExplicit, false); got != exitOK { + t.Fatalf("clean explicit backfill: exit = %d, want exitOK (%d)", got, exitOK) + } +} + // TestResolveWindow_RejectsUnaligned guards the under-bill footgun: a sub-hour // window would overwrite a complete hourly rollup with a partial sum, so a // non-hour-aligned --since/--until must fail loud rather than rate silently. @@ -95,13 +175,13 @@ func TestResolveWindow_RejectsUnaligned(t *testing.T) { } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - if _, _, err := resolveWindow(tc.since, tc.until, defaultRateTrailingHours, now); err == nil { + if _, _, _, err := resolveWindow(tc.since, tc.until, defaultRateTrailingHours, now); err == nil { t.Fatalf("expected error for non-hour-aligned window %s..%s", tc.since, tc.until) } }) } // A fully hour-aligned explicit window is still accepted. - if _, _, err := resolveWindow("2026-06-01T00:00:00Z", "2026-06-01T03:00:00Z", defaultRateTrailingHours, now); err != nil { + if _, _, _, err := resolveWindow("2026-06-01T00:00:00Z", "2026-06-01T03:00:00Z", defaultRateTrailingHours, now); err != nil { t.Fatalf("hour-aligned window should be accepted, got %v", err) } } diff --git a/internal/e2e/e2e_test.go b/internal/e2e/e2e_test.go index dbac15d..6c2a3c2 100644 --- a/internal/e2e/e2e_test.go +++ b/internal/e2e/e2e_test.go @@ -292,7 +292,7 @@ func (h *harness) rateEventHour(t *testing.T, book *rating.PriceBook) rating.Res hour := evTS.UTC().Truncate(time.Hour) rater := rating.New(rating.NewPostgresStore(h.db), book, h.log) - res, err := rater.Run(context.Background(), hour, hour.Add(time.Hour)) + res, err := rater.Run(context.Background(), hour, hour.Add(time.Hour), false) if err != nil { t.Fatalf("rater.Run: %v", err) } diff --git a/internal/proxy/iolog_test.go b/internal/proxy/iolog_test.go index d91cc99..d6553b8 100644 --- a/internal/proxy/iolog_test.go +++ b/internal/proxy/iolog_test.go @@ -1,6 +1,7 @@ package proxy import ( + "bytes" "context" "io" "net/http" @@ -308,3 +309,86 @@ func TestIOLog_SinkReceivesRecordViaOnDone(t *testing.T) { t.Fatalf("opted-in request must hand exactly 1 record to the sink, got %d", got) } } + +// newIOLogServerWithLog is newIOLogServer but with a caller-supplied logger, so a +// test can capture WARN output (the request-body-truncation observability line). +func newIOLogServerWithLog(t *testing.T, upstream *url.URL, policy iolog.Policy, sink iolog.Sink, maxBody int, log *logging.Logger) *Server { + t.Helper() + s := &config.Settings{ListenAddr: ":0"} + resolver := registry.NewStatic(upstream) + return NewWithIOLog(s, log, resolver, &recordingEmitter{}, policy, sink, maxBody) +} + +// TestIOLog_RequestTruncationLogged verifies Hugo's D1 decision: hard truncation +// of the captured request body is fine, but it must be OBSERVABLE — a single WARN +// line fires (with request_id and original→cap sizes) ONLY when the body exceeds +// the cap, and stays silent for an under-cap body. +func TestIOLog_RequestTruncationLogged(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Drain so the upstream still sees the full forwarded body. + _, _ = io.Copy(io.Discard, r.Body) + _, _ = io.WriteString(w, `{"ok":true,"model":"m"}`) + })) + defer backend.Close() + upstream, _ := url.Parse(backend.URL) + + const bodyCap = 100 + + t.Run("over cap logs", func(t *testing.T) { + var buf bytes.Buffer + log := logging.New(logging.WARN) + log.Warn.SetOutput(&buf) + + policy := &spyPolicy{decision: true} + sink := &recordingSink{} + srv := newIOLogServerWithLog(t, upstream, policy, sink, bodyCap, log) + + big := strings.Repeat("A", 500) + rr := httptest.NewRecorder() + srv.Handler().ServeHTTP(rr, iologRequest(http.MethodPost, big)) + + if rr.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", rr.Code) + } + // The record must mark the request body truncated... + recs := sink.all() + if len(recs) != 1 || !recs[0].RequestTruncated { + t.Fatalf("expected 1 record with RequestTruncated=true, got %+v", recs) + } + // ...and the WARN line must fire with request_id and original→cap sizes. + out := buf.String() + if !strings.Contains(out, "request body truncated") { + t.Fatalf("no truncation WARN logged; got %q", out) + } + if !strings.Contains(out, "request_id=req-iolog-1") { + t.Errorf("WARN missing request_id; got %q", out) + } + if !strings.Contains(out, "500 → 100 bytes") { + t.Errorf("WARN missing original→cap sizes (want \"500 → 100 bytes\"); got %q", out) + } + }) + + t.Run("under cap silent", func(t *testing.T) { + var buf bytes.Buffer + log := logging.New(logging.WARN) + log.Warn.SetOutput(&buf) + + policy := &spyPolicy{decision: true} + sink := &recordingSink{} + srv := newIOLogServerWithLog(t, upstream, policy, sink, bodyCap, log) + + rr := httptest.NewRecorder() + srv.Handler().ServeHTTP(rr, iologRequest(http.MethodPost, `{"model":"m"}`)) + + if rr.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", rr.Code) + } + recs := sink.all() + if len(recs) != 1 || recs[0].RequestTruncated { + t.Fatalf("under-cap body must not be truncated; got %+v", recs) + } + if out := buf.String(); strings.Contains(out, "request body truncated") { + t.Errorf("under-cap body must NOT log truncation; got %q", out) + } + }) +} diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index c821866..60aae9c 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -251,12 +251,20 @@ func (s *Server) handleProxy(w http.ResponseWriter, r *http.Request) { // Cap the LOGGED request-body copy at the same bound as the response copy // (s.ioMaxBodyLen) — an uncapped body flows into to_tsvector and fails the // INSERT past ~1 MiB. The forwarded request keeps the full body. - reqBody, reqTruncated, err = captureRequestBody(r, s.ioMaxBodyLen) + var reqOrigLen int + reqBody, reqTruncated, reqOrigLen, err = captureRequestBody(r, s.ioMaxBodyLen) if err != nil { s.log.Error.Printf("capture request body: %v", err) http.Error(w, "bad request body", http.StatusBadRequest) return } + // Hard truncation is intentional (an uncapped body fails the to_tsvector + // INSERT), but make it OBSERVABLE: an operator should be able to see that + // a captured body was cut, and by how much. + if reqTruncated { + s.log.Warn.Printf("iolog: request body truncated for capture request_id=%s (%d → %d bytes)", + requestID, reqOrigLen, s.ioMaxBodyLen) + } } // Force streaming usage so we never under-bill a streamed response. diff --git a/internal/proxy/rewrite.go b/internal/proxy/rewrite.go index 5f59985..c7e22ab 100644 --- a/internal/proxy/rewrite.go +++ b/internal/proxy/rewrite.go @@ -24,15 +24,17 @@ import ( // the model sees — only the stored log copy. Truncation is at a rune boundary so // the stored TEXT is valid UTF-8. maxBodyBytes <= 0 means uncapped. // -// A nil body yields ("", false) with no error. -func captureRequestBody(r *http.Request, maxBodyBytes int) (body string, truncated bool, err error) { +// A nil body yields ("", false, 0) with no error. origLen is the size of the +// FULL body read off the wire (before capping), so a caller can log the +// original-vs-cap sizes when truncated is true. +func captureRequestBody(r *http.Request, maxBodyBytes int) (body string, truncated bool, origLen int, err error) { if r.Body == nil { - return "", false, nil + return "", false, 0, nil } raw, err := io.ReadAll(r.Body) _ = r.Body.Close() if err != nil { - return "", false, err + return "", false, 0, err } // Restore the FULL body so forceIncludeUsage (and the upstream) read every // byte — the cap only bounds the LOG copy, never the forwarded request. @@ -41,7 +43,7 @@ func captureRequestBody(r *http.Request, maxBodyBytes int) (body string, truncat r.Header.Set("Content-Length", strconv.Itoa(len(raw))) logCopy, truncated := truncateAtRuneBoundary(raw, maxBodyBytes) - return string(logCopy), truncated, nil + return string(logCopy), truncated, len(raw), nil } // forceIncludeUsage rewrites a request body so that streamed responses carry a diff --git a/internal/rating/rater.go b/internal/rating/rater.go index 8b49c5a..643799e 100644 --- a/internal/rating/rater.go +++ b/internal/rating/rater.go @@ -93,7 +93,19 @@ func (r Result) HasAnomaly() bool { // re-run converges rated_usage to exactly the latest run's output — never // double-counts, and never leaves a superseded rollup billing at its stale cost. A // clean identical re-run is a no-op (same rows upserted, nothing deleted). See store.go. -func (r *Rater) Run(ctx context.Context, windowStart, windowEnd time.Time) (Result, error) { +// +// windowExplicit threads the routine-vs-backfill distinction into the rating-side +// observability so the LOG severity of a reconcile-delete matches the EXIT-code +// contract (option (c), see cmd/rater). On a ROUTINE run (default trailing-hours +// window, windowExplicit == false) a reconcile-delete rewrote a prior bill with no +// operator behind it — data vanished from billing_event or an upstream regression +// dropped events — so it is logged at ERROR (page). On an EXPLICIT backfill +// (--since/--until, windowExplicit == true) the same delete is intended convergence, +// logged at INFO. The reconcile SEMANTICS are identical either way; only the log +// severity (and, in cmd/rater, the exit code) turns on windowExplicit. The flag is +// passed in rather than recomputed here because only cmd/rater knows how the window +// was chosen, mirroring the exit-code gate. +func (r *Rater) Run(ctx context.Context, windowStart, windowEnd time.Time, windowExplicit bool) (Result, error) { windowStart = windowStart.UTC() windowEnd = windowEnd.UTC() res := Result{WindowStart: windowStart, WindowEnd: windowEnd} @@ -128,11 +140,25 @@ func (r *Rater) Run(ctx context.Context, windowStart, windowEnd time.Time) (Resu } // A re-rate that SUPERSEDES prior billing (deleted stale rollups) is significant — - // not an anomaly, but it changes a customer's bill, so surface it. 0 on a first run - // or a clean identical re-run. + // not a leaked-anomaly (it does not flip HasAnomaly), but it changes a customer's + // bill, so surface it. 0 on a first run or a clean identical re-run. The SEVERITY + // turns on windowExplicit (the routine-vs-backfill contract, option (c)): + // - ROUTINE (default trailing-hours window, !windowExplicit): no operator chose + // this window, so rewriting a prior bill is alarming — events vanished from + // billing_event (data loss) or an upstream regression dropped them (e.g. + // base_model stopped propagating → rollups went unpriced/ambiguous). ERROR: + // page/investigate. This is the loud half that matches cmd/rater's exit 2. + // - EXPLICIT backfill (--since/--until, windowExplicit): convergence is exactly + // what the operator asked for (e.g. a late price fix) — INFO, not a page. + // The deletion count + window appear in both; only the level and the wording differ. if res.ReconciledDeletions > 0 { - r.log.Info.Printf("rating: window [%s,%s) reconcile DELETED %d stale rollup(s) that prior runs billed but this run no longer produces (became ambiguous/unpriced, or their events vanished) — 'what the latest run says is what bills'", - windowStart.Format(time.RFC3339), windowEnd.Format(time.RFC3339), res.ReconciledDeletions) + if windowExplicit { + r.log.Info.Printf("rating: EXPLICIT backfill window [%s,%s) reconcile DELETED %d stale rollup(s) that prior runs billed but this run no longer produces (became ambiguous/unpriced, or their events vanished) — intended convergence, 'what the latest run says is what bills'", + windowStart.Format(time.RFC3339), windowEnd.Format(time.RFC3339), res.ReconciledDeletions) + } else { + r.log.Error.Printf("rating: ROUTINE window [%s,%s) reconcile DELETED %d previously-billed rollup(s) — a routine run REWROTE A PRIOR BILL with no operator behind it, meaning those events VANISHED from billing_event (data loss) or an upstream regression now drops them (e.g. base_model stopped propagating → rollups went unpriced/ambiguous). This is NOT a backfill; page/investigate (run with --since/--until only after confirming the deletion is intended)", + windowStart.Format(time.RFC3339), windowEnd.Format(time.RFC3339), res.ReconciledDeletions) + } } if res.HasAnomaly() { diff --git a/internal/rating/rater_test.go b/internal/rating/rater_test.go index c0f0925..00738b8 100644 --- a/internal/rating/rater_test.go +++ b/internal/rating/rater_test.go @@ -1,7 +1,9 @@ package rating import ( + "bytes" "context" + "strings" "testing" "time" @@ -170,7 +172,7 @@ func TestRater_MultiEventAggregation(t *testing.T) { store := newOracleStore(bookM(), events) r := New(store, bookM(), testLogger()) - res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z")) + res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z"), false) if err != nil { t.Fatal(err) } @@ -204,11 +206,11 @@ func TestRater_IdempotentRerunNoDoubling(t *testing.T) { r := New(store, bookM(), testLogger()) ws, we := mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z") - res1, err := r.Run(context.Background(), ws, we) + res1, err := r.Run(context.Background(), ws, we, false) if err != nil { t.Fatal(err) } - res2, err := r.Run(context.Background(), ws, we) + res2, err := r.Run(context.Background(), ws, we, false) if err != nil { t.Fatal(err) } @@ -245,7 +247,7 @@ func TestRater_ReRateDeletesSupersededRollup(t *testing.T) { {AuthID: "a", ModelID: "ft:dupe", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, }) r := New(store, book, testLogger()) - resA, err := r.Run(context.Background(), ws, we) + resA, err := r.Run(context.Background(), ws, we, false) if err != nil { t.Fatal(err) } @@ -261,7 +263,7 @@ func TestRater_ReRateDeletesSupersededRollup(t *testing.T) { // window → ambiguous. Run B excludes it from priced. store.events = append(store.events, RatedEvent{AuthID: "a", ModelID: "ft:dupe", BaseModel: "expensive/base", PromptTokens: 1000, At: at.Add(5 * time.Minute)}) - resB, err := r.Run(context.Background(), ws, we) + resB, err := r.Run(context.Background(), ws, we, false) if err != nil { t.Fatal(err) } @@ -301,7 +303,7 @@ func TestRater_ReRateSupersedesOneKeepsAnotherSameWindow(t *testing.T) { {AuthID: "a", ModelID: "ft:gone", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, }) r := New(store, book, testLogger()) - if _, err := r.Run(context.Background(), ws, we); err != nil { + if _, err := r.Run(context.Background(), ws, we, false); err != nil { t.Fatal(err) } if len(store.table) != 2 { @@ -310,7 +312,7 @@ func TestRater_ReRateSupersedesOneKeepsAnotherSameWindow(t *testing.T) { // Run B: ft:gone gains a second base (ambiguous → superseded); ft:keep unchanged. store.events = append(store.events, RatedEvent{AuthID: "a", ModelID: "ft:gone", BaseModel: "expensive/base", PromptTokens: 1000, At: at.Add(5 * time.Minute)}) - resB, err := r.Run(context.Background(), ws, we) + resB, err := r.Run(context.Background(), ws, we, false) if err != nil { t.Fatal(err) } @@ -339,7 +341,7 @@ func TestRater_ReRateDeletesVanishedRollup(t *testing.T) { {AuthID: "a", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, }) r := New(store, bookM(), testLogger()) - if _, err := r.Run(context.Background(), ws, we); err != nil { + if _, err := r.Run(context.Background(), ws, we, false); err != nil { t.Fatal(err) } if len(store.table) != 1 { @@ -347,7 +349,7 @@ func TestRater_ReRateDeletesVanishedRollup(t *testing.T) { } // All events for the window vanish. store.events = nil - resB, err := r.Run(context.Background(), ws, we) + resB, err := r.Run(context.Background(), ws, we, false) if err != nil { t.Fatal(err) } @@ -370,11 +372,11 @@ func TestRater_ReRateIdenticalDataIsNoOp(t *testing.T) { }) r := New(store, bookM(), testLogger()) - res1, err := r.Run(context.Background(), ws, we) + res1, err := r.Run(context.Background(), ws, we, false) if err != nil { t.Fatal(err) } - res2, err := r.Run(context.Background(), ws, we) + res2, err := r.Run(context.Background(), ws, we, false) if err != nil { t.Fatal(err) } @@ -393,11 +395,11 @@ func TestRater_ReRateIdenticalDataIsNoOp(t *testing.T) { otherHour := mustTime("2026-06-08T12:00:00Z") store.events = append(store.events, RatedEvent{AuthID: "a", ModelID: "m", PromptTokens: 5, At: otherHour.Add(5 * time.Minute)}) - if _, err := r.Run(context.Background(), otherHour, otherHour.Add(time.Hour)); err != nil { + if _, err := r.Run(context.Background(), otherHour, otherHour.Add(time.Hour), false); err != nil { t.Fatal(err) } // Now re-rate ONLY the first window again: the 12:00 rollup must survive untouched. - resFirst, err := r.Run(context.Background(), ws, we) + resFirst, err := r.Run(context.Background(), ws, we, false) if err != nil { t.Fatal(err) } @@ -410,6 +412,111 @@ func TestRater_ReRateIdenticalDataIsNoOp(t *testing.T) { } } +// captureLogger builds a Logger whose INFO and ERROR streams are redirected to the +// returned buffers, so a test can assert WHICH severity a reconcile-delete fired at. +// It starts at DEBUG so neither stream is discarded by the level filter — the test +// is asserting the call site's chosen level, not the logger's threshold. +func captureLogger() (*logging.Logger, *bytes.Buffer, *bytes.Buffer) { + log := logging.New(logging.DEBUG) + var infoBuf, errBuf bytes.Buffer + log.Info.SetOutput(&infoBuf) + log.Error.SetOutput(&errBuf) + return log, &infoBuf, &errBuf +} + +// supersededReconcileStore returns an oracleStore whose first Run produces ONE clean +// rollup and whose second Run (after the events vanish) reconcile-DELETES it with NO +// other anomaly — the minimal reconcile-delete (HasAnomaly stays false), so a test can +// attribute the resulting log line purely to the reconcile path, not the anomaly path. +func supersededReconcileStore() (*oracleStore, time.Time, time.Time) { + at := mustTime("2026-06-08T10:15:00Z") + ws, we := mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z") + store := newOracleStore(bookM(), []RatedEvent{ + {AuthID: "a", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, + }) + return store, ws, we +} + +// TestRater_RoutineReconcileDeleteLogsError pins the LOUD half of the reconcile +// observability contract (option (c)): a ROUTINE run (windowExplicit == false) that +// reconcile-DELETES a previously-billed rollup must emit an ERROR line (page someone), +// even with NO other anomaly. RED before FIX 1: the old code logged the reconcile-delete +// UNCONDITIONALLY at INFO (testLogger discards INFO), so the ERROR stream was empty and +// this assertion failed — the page never fired on a routine bill rewrite. +func TestRater_RoutineReconcileDeleteLogsError(t *testing.T) { + store, ws, we := supersededReconcileStore() + log, infoBuf, errBuf := captureLogger() + r := New(store, bookM(), log) + + // Run A: one clean rollup, no reconcile-delete. + if _, err := r.Run(context.Background(), ws, we, false); err != nil { + t.Fatal(err) + } + infoBuf.Reset() + errBuf.Reset() + + // Run B (routine): the events vanish → the rollup is reconcile-deleted with no anomaly. + store.events = nil + resB, err := r.Run(context.Background(), ws, we, false) + if err != nil { + t.Fatal(err) + } + if resB.ReconciledDeletions != 1 { + t.Fatalf("setup: deletions=%d, want 1", resB.ReconciledDeletions) + } + if resB.HasAnomaly() { + t.Fatalf("setup: a clean reconcile-delete must NOT flip HasAnomaly (got %+v) — else the ERROR could come from the anomaly path, not the reconcile path", resB) + } + // THE INVARIANT: a routine reconcile-delete fires an ERROR (the page). + if errBuf.Len() == 0 { + t.Fatal("routine reconcile-delete emitted NO ERROR line — a prior bill was rewritten with no operator behind it (data loss / upstream regression); it MUST log at ERROR so a CronJob pages") + } + if !strings.Contains(errBuf.String(), "ROUTINE") || !strings.Contains(errBuf.String(), "reconcile") { + t.Fatalf("routine reconcile-delete ERROR line lacks the routine-rewrite wording: %q", errBuf.String()) + } + // The deletion count and window must be present in the loud line. + if !strings.Contains(errBuf.String(), "[2026-06-08T10:00:00Z,2026-06-08T11:00:00Z)") { + t.Fatalf("ERROR line is missing the window: %q", errBuf.String()) + } +} + +// TestRater_BackfillReconcileDeleteLogsInfoNoError pins the QUIET half: an EXPLICIT +// operator backfill (windowExplicit == true) that reconcile-deletes the SAME rollup is +// intended convergence — it logs at INFO and emits NO ERROR (no page). The flag flips +// ONLY the reconcile-delete severity; identical data, identical reconcile, opposite level. +func TestRater_BackfillReconcileDeleteLogsInfoNoError(t *testing.T) { + store, ws, we := supersededReconcileStore() + log, infoBuf, errBuf := captureLogger() + r := New(store, bookM(), log) + + // Run A (explicit backfill): one clean rollup. + if _, err := r.Run(context.Background(), ws, we, true); err != nil { + t.Fatal(err) + } + infoBuf.Reset() + errBuf.Reset() + + // Run B (explicit backfill): the events vanish → reconcile-delete, but operator-chosen. + store.events = nil + resB, err := r.Run(context.Background(), ws, we, true) + if err != nil { + t.Fatal(err) + } + if resB.ReconciledDeletions != 1 { + t.Fatalf("setup: deletions=%d, want 1", resB.ReconciledDeletions) + } + // THE INVARIANT: a backfill reconcile-delete is INFO, never ERROR. + if errBuf.Len() != 0 { + t.Fatalf("explicit backfill reconcile-delete emitted an ERROR line (it must be INFO — the operator asked for this convergence): %q", errBuf.String()) + } + if !strings.Contains(infoBuf.String(), "reconcile DELETED") { + t.Fatalf("explicit backfill reconcile-delete did not log the convergence INFO line: %q", infoBuf.String()) + } + if !strings.Contains(infoBuf.String(), "[2026-06-08T10:00:00Z,2026-06-08T11:00:00Z)") { + t.Fatalf("INFO line is missing the window: %q", infoBuf.String()) + } +} + // TestRater_LateArrivalRatedByTrailingWindow: an event whose event_ts falls in hour // H but which is DRAINED only after H was rated is picked up by a later run whose // window still covers H — the trailing-window contract. The re-rate REPLACES H's @@ -419,7 +526,7 @@ func TestRater_LateArrivalRatedByTrailingWindow(t *testing.T) { store := newOracleStore(bookM(), nil) r := New(store, bookM(), testLogger()) - res1, err := r.Run(context.Background(), hourH, hourH.Add(time.Hour)) + res1, err := r.Run(context.Background(), hourH, hourH.Add(time.Hour), false) if err != nil { t.Fatal(err) } @@ -431,7 +538,7 @@ func TestRater_LateArrivalRatedByTrailingWindow(t *testing.T) { AuthID: "a", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: hourH.Add(15 * time.Minute), }) - res2, err := r.Run(context.Background(), hourH.Add(-23*time.Hour), hourH.Add(2*time.Hour)) + res2, err := r.Run(context.Background(), hourH.Add(-23*time.Hour), hourH.Add(2*time.Hour), false) if err != nil { t.Fatal(err) } @@ -455,7 +562,7 @@ func TestRater_MissingPriceFailsLoudNotZero(t *testing.T) { } store := newOracleStore(bookM(), events) r := New(store, bookM(), testLogger()) - res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z")) + res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z"), false) if err != nil { t.Fatal(err) } @@ -486,7 +593,7 @@ func TestRater_UnattributableCountedNotSilent(t *testing.T) { } store := newOracleStore(bookM(), events) r := New(store, bookM(), testLogger()) - res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z")) + res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z"), false) if err != nil { t.Fatal(err) } @@ -514,7 +621,7 @@ func TestRater_DerivedFineTuneRatedViaPolicy(t *testing.T) { } store := newOracleStore(book, events) r := New(store, book, testLogger()) - res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z")) + res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z"), false) if err != nil { t.Fatal(err) } @@ -544,7 +651,7 @@ func TestRater_FineTunePricesViaBaseModelOnEvent(t *testing.T) { }} store := newOracleStore(book, events) r := New(store, book, testLogger()) - res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z")) + res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z"), false) if err != nil { t.Fatal(err) } @@ -583,7 +690,7 @@ func TestRater_FineTuneWithoutBaseModelFailsLoud(t *testing.T) { }} store := newOracleStore(book, events) r := New(store, book, testLogger()) - res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z")) + res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z"), false) if err != nil { t.Fatal(err) } @@ -628,7 +735,7 @@ func TestRater_FineTuneAmbiguousBaseModelFailsLoud(t *testing.T) { } store := newOracleStore(book, events) r := New(store, book, testLogger()) - res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z")) + res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z"), false) if err != nil { t.Fatal(err) } @@ -686,7 +793,7 @@ func TestRater_RollupCostSelfAudits(t *testing.T) { } store := newOracleStore(book, events) r := New(store, book, testLogger()) - if _, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z")); err != nil { + if _, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z"), false); err != nil { t.Fatal(err) } if len(store.table) == 0 { @@ -719,7 +826,7 @@ func TestRater_AppliedRateStoredOnRow(t *testing.T) { } store := newOracleStore(book, events) r := New(store, book, testLogger()) - if _, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z")); err != nil { + if _, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z"), false); err != nil { t.Fatal(err) } hour := mustTime("2026-06-08T10:00:00Z") @@ -738,7 +845,7 @@ func TestRater_AppliedRateStoredOnRow(t *testing.T) { func TestRater_InvertedWindow(t *testing.T) { store := newOracleStore(bookM(), nil) r := New(store, bookM(), testLogger()) - if _, err := r.Run(context.Background(), mustTime("2026-06-08T11:00:00Z"), mustTime("2026-06-08T10:00:00Z")); err == nil { + if _, err := r.Run(context.Background(), mustTime("2026-06-08T11:00:00Z"), mustTime("2026-06-08T10:00:00Z"), false); err == nil { t.Fatal("expected error for inverted window") } } @@ -768,7 +875,7 @@ func TestOracleModel_SelfConsistent(t *testing.T) { } store := newOracleStore(book, events) r := New(store, book, testLogger()) - res, err := r.Run(context.Background(), hour, hour.Add(time.Hour)) + res, err := r.Run(context.Background(), hour, hour.Add(time.Hour), false) if err != nil { t.Fatal(err) } diff --git a/internal/rating/store.go b/internal/rating/store.go index 9ae311b..e0daef0 100644 --- a/internal/rating/store.go +++ b/internal/rating/store.go @@ -367,7 +367,13 @@ upserted AS ( cost, applied_prompt_rate, applied_cached_rate, applied_completion_rate, event_count FROM priced - -- Deterministic lock order across concurrent raters (no ABBA deadlock). + -- Deterministic upsert order so a SINGLE rater never self-deadlocks against its + -- own rows (it takes row locks in one consistent order). Cross-rater + -- deadlock-safety (this ordered upsert vs. the deleted CTE's DELETE — a + -- potential ABBA pair) holds ONLY UNDER SINGLE-FLIGHT: the deployment forbids two + -- raters over overlapping windows (Atlas CronJob concurrencyPolicy: Forbid), so + -- the cross-rater hazard is unreachable and no delete-lock-ordering machinery is + -- added here. See cmd/rater's package doc for the single-flight contract. ORDER BY auth_id, model_id, window_start ON CONFLICT (auth_id, model_id, window_start) DO UPDATE SET window_end = EXCLUDED.window_end, diff --git a/internal/rating/store_integration_test.go b/internal/rating/store_integration_test.go index b3965ad..6392ef7 100644 --- a/internal/rating/store_integration_test.go +++ b/internal/rating/store_integration_test.go @@ -63,7 +63,13 @@ CREATE TABLE rated_usage ( event_count BIGINT NOT NULL, rated_at TIMESTAMPTZ NOT NULL DEFAULT now(), CONSTRAINT rated_usage_auth_model_window_uq UNIQUE (auth_id, model_id, window_start) -);` +); + +-- Mirror the production indexes (migrations/0002_rating.sql): the auth-leading index +-- for billing queries, and the window_start-leading index the reconcile DELETE needs +-- (it filters window_start alone; every auth-leading index leaves it trailing). +CREATE INDEX rated_usage_auth_id_window_start_ix ON rated_usage (auth_id, window_start); +CREATE INDEX rated_usage_window_start_ix ON rated_usage (window_start);` // conformanceBook is the fixture price book shared by the conformance tests: base // "b" with its own rate, fine-tune "f" derived from "b", 1.5× premium. @@ -587,8 +593,8 @@ func TestIntegration_FineTuneAmbiguousBaseModelFailsLoud(t *testing.T) { } } -// TestIntegration_ReRateReconciles runs the REAL SQL to pin FIX 2: re-rate RECONCILES -// (deletes superseded rollups), it is NOT upsert-only. Run A bills a CLEAN single-base +// TestIntegration_ReRateReconciles runs the REAL SQL to pin the reconcile semantics: +// re-rate RECONCILES (deletes superseded rollups), it is NOT upsert-only. Run A bills a CLEAN single-base // ft: rollup. Then a second, distinct base_model arrives for the SAME ft: id in the same // window, making it ambiguous; run B excludes it from priced and must DELETE the stale // rated_usage row in the SAME statement — never leave it billing at its run-A cost. A @@ -952,6 +958,133 @@ func TestIntegration_RatingInstantIndexServesScan(t *testing.T) { } } +// TestIntegration_ReconcileDeleteUsesWindowStartIndex: the reconcile DELETE (the +// `deleted` CTE) filters rated_usage on window_start ALONE. Every other index leads +// with auth_id, leaving window_start trailing → no index can serve a +// window_start-only range scan, so without rated_usage_window_start_ix the reconcile +// seq-scans rated_usage (and takes a full-trailing-window lock footprint) on every +// run. This pins that the window_start-leading index exists AND the planner uses it +// for the reconcile's EXACT predicate. +// +// REAL PROOF, not vacuous: an earlier version EXPLAINed a `SELECT 1` against a +// freshly-created EMPTY rated_usage. On an empty (or tiny) table the planner +// seqscans regardless of any index — and the standalone SELECT does not even +// resemble the reconcile DELETE — so a passing assertion proved nothing about the +// actual statement. Here we (a) POPULATE rated_usage with many rows across many +// hours so a narrow window-slice is genuinely cheaper by index than a seqscan, then +// (b) EXPLAIN the ACTUAL reconcile DELETE from store.go (the `deleted` CTE's +// rated_usage table-access: the window_start range plus the NOT EXISTS anti-join +// against this run's priced rows) and assert it scans rated_usage via +// rated_usage_window_start_ix. We assert the index is chosen by DEFAULT cost +// (seqscan ENABLED) — that is the load-bearing proof the index PAYS OFF for the +// reconcile's predicate, not just that one CAN be forced. A seqscan-disabled pass +// follows as a belt-and-braces check that the index can serve the predicate at all. +func TestIntegration_ReconcileDeleteUsesWindowStartIndex(t *testing.T) { + dsn := os.Getenv("PHOEBE_TEST_DATABASE_URL") + if dsn == "" { + t.Skip("PHOEBE_TEST_DATABASE_URL not set; skipping live-Postgres conformance") + } + db, err := sql.Open("pgx", dsn) + if err != nil { + t.Fatalf("open: %v", err) + } + defer db.Close() + db.SetMaxOpenConns(1) + + const sch = "phoebe_rating_winix_it" + exec(t, db, "DROP SCHEMA IF EXISTS "+sch+" CASCADE") + exec(t, db, "CREATE SCHEMA "+sch) + exec(t, db, "SET search_path TO "+sch) + defer func() { exec(t, db, "DROP SCHEMA IF EXISTS "+sch+" CASCADE") }() + exec(t, db, schemaDDL) + + // POPULATE: 50 distinct auth_ids × 200 hours = 10k rated_usage rows spread across + // a wide window_start range. The reconcile DELETE targets a SINGLE hour, so the + // index-served slice is ~50 rows out of 10k — a range the planner should prefer + // the window_start index for over a full seqscan once it has stats. + exec(t, db, `INSERT INTO rated_usage + (id, auth_id, model_id, window_start, window_end, + prompt_tokens, cached_tokens, completion_tokens, billable_prompt_tokens, + cost, applied_prompt_rate, applied_cached_rate, applied_completion_rate, event_count) + SELECT + md5(a::text || ':' || h::text), + 'auth' || a::text, + 'm', + '2026-01-01T00:00:00Z'::timestamptz + (h || ' hours')::interval, + '2026-01-01T01:00:00Z'::timestamptz + (h || ' hours')::interval, + 100, 0, 0, 100, 0.001, 0.00001, 0, 0, 1 + FROM generate_series(0, 49) AS a, generate_series(0, 199) AS h`) + // ANALYZE so the planner has row-count + distribution stats (without it the + // estimates are defaults and the choice is not a real cost decision). + exec(t, db, "ANALYZE rated_usage") + + // EXPLAIN the ACTUAL reconcile DELETE statement (store.go's `deleted` CTE shape): + // the rated_usage range on window_start, anti-joined against this run's priced + // rows. `priced` is empty here (a re-run that reproduces nothing for this hour → + // the whole slice is deleted), which is exactly the worst-case reconcile that + // must still go through the index rather than seqscanning the whole table. + const reconcileDelete = `EXPLAIN + WITH priced AS ( + SELECT auth_id, model_id, window_start FROM rated_usage WHERE false + ) + DELETE FROM rated_usage ru + WHERE ru.window_start >= '2026-01-05T04:00:00Z' + AND ru.window_start < '2026-01-05T05:00:00Z' + AND NOT EXISTS ( + SELECT 1 FROM priced p + WHERE p.auth_id = ru.auth_id + AND p.model_id = ru.model_id + AND p.window_start = ru.window_start + )` + + // (1) Default-cost plan (seqscan ENABLED): the LOAD-BEARING proof the index PAYS + // OFF — the planner chooses rated_usage_window_start_ix on cost, not because + // seqscan was forbidden. This is discriminating: verified empirically, dropping + // the index makes the planner fall back to the auth-leading composite + // (auth_id, window_start) as a far costlier full-index scan (~280 vs ~8), so this + // assertion FAILS without the window_start index — it is not vacuous. (The + // fallback is an index scan, not a literal Seq Scan, because the composite still + // covers window_start as a non-leading column; the point stands — the dedicated + // window_start index is what the reconcile's window-only predicate should use.) + plan := explainPlan(t, db, reconcileDelete) + if !strings.Contains(plan, "rated_usage_window_start_ix") { + t.Fatalf("reconcile DELETE did not choose rated_usage_window_start_ix at DEFAULT cost on a populated table (a window_start-only slice should prefer the dedicated index):\n%s", plan) + } + if strings.Contains(plan, "Seq Scan on rated_usage ru") { + t.Fatalf("reconcile DELETE seq-scans rated_usage at default cost — the window_start index is not serving the range predicate:\n%s", plan) + } + + // (2) Belt-and-braces: with seqscan disabled the index must STILL be able to serve + // the predicate (a remaining seqscan would mean no usable index at all). + exec(t, db, "SET enable_seqscan = off") + plan2 := explainPlan(t, db, reconcileDelete) + if !strings.Contains(plan2, "rated_usage_window_start_ix") { + t.Fatalf("reconcile DELETE predicate cannot be served by rated_usage_window_start_ix even with seqscan off:\n%s", plan2) + } +} + +// explainPlan runs an EXPLAIN query and returns the joined plan text. +func explainPlan(t *testing.T, db *sql.DB, explainQuery string) string { + t.Helper() + rows, err := db.Query(explainQuery) + if err != nil { + t.Fatalf("explain: %v", err) + } + defer rows.Close() + var plan string + for rows.Next() { + var line string + if err := rows.Scan(&line); err != nil { + t.Fatalf("scan: %v", err) + } + plan += line + "\n" + } + if err := rows.Err(); err != nil { + t.Fatalf("rows: %v", err) + } + return plan +} + func exec(t *testing.T, db *sql.DB, q string) { t.Helper() if _, err := db.Exec(q); err != nil { diff --git a/migrations/0002_rating.sql b/migrations/0002_rating.sql index e10d70d..4a3d80b 100644 --- a/migrations/0002_rating.sql +++ b/migrations/0002_rating.sql @@ -88,6 +88,18 @@ CREATE TABLE rated_usage ( CREATE INDEX rated_usage_auth_id_window_start_ix ON rated_usage (auth_id, window_start); +-- The reconcile DELETE (re-rate convergence, the `deleted` CTE in +-- internal/rating/store.go) filters rated_usage on window_start ALONE +-- (window_start >= $1 AND window_start < $2), then anti-joins priced. Every other +-- index here LEADS with auth_id, so window_start is only a TRAILING column and +-- cannot serve a window_start-only range scan — the reconcile would seq-scan +-- rated_usage and take a full-trailing-window lock footprint on EVERY run (the +-- default window re-rates 24 closed hours). This window_start-leading index makes +-- the reconcile DELETE an index range scan over exactly the [window_start) hours in +-- scope. Mirrors the Alembic rating migration. +CREATE INDEX rated_usage_window_start_ix + ON rated_usage (window_start); + -- The rater scans billing_event by its RATING INSTANT, COALESCE(event_ts, -- created_at), over the rating window. The index must be on that EXACT -- expression: Postgres matches index expressions structurally, so an index on diff --git a/migrations/atlas/c2f1a3b4d5e6_add_rating.py b/migrations/atlas/c2f1a3b4d5e6_add_rating.py index c79bf9e..f06a8ed 100644 --- a/migrations/atlas/c2f1a3b4d5e6_add_rating.py +++ b/migrations/atlas/c2f1a3b4d5e6_add_rating.py @@ -96,6 +96,22 @@ def upgrade(): unique=False, ) + # The reconcile DELETE (re-rate convergence, the `deleted` CTE in + # internal/rating/store.go) filters rated_usage on window_start ALONE + # (window_start >= $1 AND window_start < $2), then anti-joins priced. Every other + # index on this table LEADS with auth_id, so window_start is only a TRAILING column + # and cannot serve a window_start-only range scan — without this index the reconcile + # would seq-scan rated_usage and take a full-trailing-window lock footprint on EVERY + # run (the default window re-rates 24 closed hours). A window_start-leading index + # turns the reconcile DELETE into an index range scan over exactly the in-scope + # hours. Mirrors migrations/0002_rating.sql. + op.create_index( + "rated_usage_window_start_ix", + "rated_usage", + ["window_start"], + unique=False, + ) + # The rater filters billing_event on its RATING INSTANT, COALESCE(event_ts, # created_at). The index must be on that EXACT expression: Postgres matches index # expressions structurally, so an index on bare (event_ts) can never serve the @@ -132,5 +148,15 @@ def downgrade(): # re-run downgrade must not error on an already-dropped index). Keeps up/down/up # idempotent. op.execute("DROP INDEX IF EXISTS billing_event_rating_instant_ix") - op.drop_index("rated_usage_auth_id_window_start_ix", table_name="rated_usage") + # Drop indexes in reverse create order. if_exists=True on BOTH so a + # partially-applied or re-run downgrade is idempotent (matches the upgrade's + # idempotent ADD COLUMN IF NOT EXISTS / DROP INDEX IF EXISTS): a downgrade that + # already removed an index, or one running against a DB where create_table's + # implicit drop already took it, must not error. + op.drop_index( + "rated_usage_window_start_ix", table_name="rated_usage", if_exists=True + ) + op.drop_index( + "rated_usage_auth_id_window_start_ix", table_name="rated_usage", if_exists=True + ) op.drop_table("rated_usage")