From de555ab66e25bb3a9cf8e23424aa54ec5028f198 Mon Sep 17 00:00:00 2001 From: hugo Date: Tue, 16 Jun 2026 01:53:20 +0000 Subject: [PATCH] Rating: add resource_id to rated_usage rollup grain (E2 customer attribution) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The rater dropped resource_id, but billing needs it to identify the customer org (E2: bill the deployment's org via resource_id→org_id). resource_id is already on billing_event (nullable); plumb it through the rater into the rated_usage grain. New grain / unique key: (auth_id, resource_id, model_id, window_start). model_id stays — price resolution is per-model and one deployment can serve multiple models/adapters at different rates, so collapsing models would sum traffic priced differently. Two deployments of the same model by the same auth in one hour now correctly produce two rows (they may bill to different orgs). FAIL-CLOSED ATTRIBUTION: resource_id is NULLABLE on billing_event but the new key column is NON-NULL. A row that can't name its deployment/org CANNOT be billed. The grouped/priced filter requires resource_id IS NOT NULL, and the unattributable partition counts resource_id IS NULL — so a NULL-resource_id row is surfaced (exits nonzero), never silently $0-billed or billed to a NULL org. The partition invariant holds: events_rated + unpriced + unattributable + ambiguous == total in-window events. Touch-points: ev/resolved/grouped/priced CTEs, the md5 surrogate-key natural key (length-prefixed, fixed order), ON CONFLICT target + ORDER BY (lock order), the reconcile `deleted` CTE anti-join, both migrations (0002_rating.sql + alembic, edited in place — unapplied, no prod data), a new (resource_id, window_start) index for E2 per-deployment reads, doc comments, and the oracle + SQL-shape + integration + e2e tests. New negative tests: DistinctDeploymentsBillSeparately and NullResourceIdIsUnattributable (Go oracle + live-PG), plus a resource_id assertion in the e2e rollup read. Co-Authored-By: Claude Opus 4.8 --- cmd/rater/main.go | 8 +- internal/e2e/e2e_test.go | 12 +- internal/rating/doc.go | 2 +- internal/rating/oracle_test.go | 9 +- internal/rating/rater.go | 14 +- internal/rating/rater_test.go | 186 ++++++++++++++------ internal/rating/store.go | 64 ++++--- internal/rating/store_integration_test.go | 174 ++++++++++++++---- internal/rating/store_test.go | 25 ++- migrations/0002_rating.sql | 43 +++-- migrations/README.md | 8 +- migrations/atlas/c2f1a3b4d5e6_add_rating.py | 41 ++++- 12 files changed, 435 insertions(+), 151 deletions(-) diff --git a/cmd/rater/main.go b/cmd/rater/main.go index c770edc..6b17ad5 100644 --- a/cmd/rater/main.go +++ b/cmd/rater/main.go @@ -3,7 +3,7 @@ // premium policy, and per-GPU floor rates — reads a window of raw metering rows from // billing_event, projects the prices into a transient TEMP table, and ENTIRELY IN // SQL computes per-event cost as exact NUMERIC, sums it, and upserts -// per-(auth_id, model_id, hour) cost rollups into rated_usage (with the applied +// per-(auth_id, resource_id, model_id, hour) cost rollups into rated_usage (with the applied // per-token rates frozen onto each row). No money math happens in Go — the rater // binary is orchestration only (the fine-tune premium is applied in exact decimal // when the prices are projected, then handed to SQL as NUMERIC). @@ -66,8 +66,8 @@ // last hour: events can be DRAINED LATE (a Valkey outage recovered from the WAL) // with an event_ts in an hour that was already rated, and a last-hour-only cron // would never revisit that hour — silent revenue loss. Re-rating closed hours is -// safe BY CONSTRUCTION: the upsert REPLACES each (auth_id, model_id, hour) bucket -// with a freshly recomputed total, so the late event is folded in and nothing +// safe BY CONSTRUCTION: the upsert REPLACES each (auth_id, resource_id, model_id, hour) +// bucket with a freshly recomputed total, so the late event is folded in and nothing // doubles. RESIDUAL RISK, stated honestly: an event arriving MORE than N hours // late still slips past the default window (the DESIGN.md reconciliation backstop // is the eventual answer; until then, widen rateTrailingHours or re-rate the hour @@ -228,7 +228,7 @@ func run() int { // re-rates the last N closed hours so an event DRAINED LATE into an already-rated // hour (Valkey outage → WAL recovery) is picked up by a later run instead of being // lost forever. Re-rating a closed hour is safe by construction: the upsert -// REPLACES each (auth_id, model_id, hour) bucket with a recomputed total, so +// REPLACES each (auth_id, resource_id, model_id, hour) bucket with a recomputed total, so // re-runs reconcile and never double-count. An event arriving more than N hours // late still slips (residual risk; see the package doc — reconciliation is the // backstop). Either flag may be given explicitly (RFC3339) and WINS over the diff --git a/internal/e2e/e2e_test.go b/internal/e2e/e2e_test.go index 6c2a3c2..447c15f 100644 --- a/internal/e2e/e2e_test.go +++ b/internal/e2e/e2e_test.go @@ -469,7 +469,7 @@ func TestE2E_StreamedRequestBecomesMoney(t *testing.T) { var ( nRollups int - ruAuthID, ruModelID, cost string + ruAuthID, ruResourceID, ruModelID, cost string ruPrompt, ruCached, ruCompletion, ruBillable int64 eventCount int64 // BIGINT column ) @@ -480,14 +480,20 @@ func TestE2E_StreamedRequestBecomesMoney(t *testing.T) { t.Fatalf("rated_usage rows = %d, want exactly 1", nRollups) } if err := h.db.QueryRow( - `SELECT auth_id, model_id, prompt_tokens, cached_tokens, completion_tokens, billable_prompt_tokens, cost::text, event_count + `SELECT auth_id, resource_id, model_id, prompt_tokens, cached_tokens, completion_tokens, billable_prompt_tokens, cost::text, event_count FROM rated_usage`). - Scan(&ruAuthID, &ruModelID, &ruPrompt, &ruCached, &ruCompletion, &ruBillable, &cost, &eventCount); err != nil { + Scan(&ruAuthID, &ruResourceID, &ruModelID, &ruPrompt, &ruCached, &ruCompletion, &ruBillable, &cost, &eventCount); err != nil { t.Fatalf("read rated_usage: %v", err) } if ruAuthID != testAuthID { t.Errorf("rated_usage.auth_id = %q, want %q (the X-Saturn-Auth-Id header value)", ruAuthID, testAuthID) } + // E2 customer attribution: the rollup carries the deployment id the request routed + // to (X-Saturn-Resource-Id), end-to-end from the proxy through billing_event into + // the rated_usage grain — this is the key billing resolves the org from. + if ruResourceID != testResourceID { + t.Errorf("rated_usage.resource_id = %q, want %q (the X-Saturn-Resource-Id header value, for E2 org attribution)", ruResourceID, testResourceID) + } // THE deployment-id-bug guard, at the far end of the pipe: the money is // keyed on the engine name the upstream reported, not the id we routed on. if ruModelID != testModelName { diff --git a/internal/rating/doc.go b/internal/rating/doc.go index c4dc847..bc427d8 100644 --- a/internal/rating/doc.go +++ b/internal/rating/doc.go @@ -1,5 +1,5 @@ // Package rating is phoebe's REVENUE path. It turns the raw token counts in -// billing_event into money: per (auth_id, model_id, hour) cost rollups in +// billing_event into money: per (auth_id, resource_id, model_id, hour) cost rollups in // rated_usage, priced from a YAML PRICE FILE (E1) — not a DB price table. // // THE PRICE FILE IS THE CONTRACT (E1). An operator authors a versioned YAML file diff --git a/internal/rating/oracle_test.go b/internal/rating/oracle_test.go index 36cf368..7322006 100644 --- a/internal/rating/oracle_test.go +++ b/internal/rating/oracle_test.go @@ -30,8 +30,13 @@ import ( // decoupled from metering.Event so the pure money math has no dependency on the // capture/emit side and can be tested in isolation. type RatedEvent struct { - AuthID string - ModelID string + AuthID string + // ResourceID is the deployment id (E2 customer attribution — billing resolves the + // org via resource_id→org_id). Part of the rollup grain. Empty → unattributable + // (the row can't name its deployment/org), mirroring the SQL's resource_id-IS-NULL + // handling: counted, never billed. + ResourceID string + ModelID string // BaseModel is the HF base id a fine-tune derives from (E3), carried on the event // from billing_event.base_model. Empty for a base model. The oracle prices an ft: // ModelID via base x premium keyed on BaseModel — mirroring the SQL. diff --git a/internal/rating/rater.go b/internal/rating/rater.go index 643799e..a40101a 100644 --- a/internal/rating/rater.go +++ b/internal/rating/rater.go @@ -43,9 +43,9 @@ type Result struct { // casts these as ::bigint to avoid a silent 32-bit overflow (see store.go). EventsRated int64 UnpricedEvents int64 // events whose model had NO resolvable price (NOT $0-billed) - UnattributableEvents int64 // in-window rows with NULL auth_id/model_id (upstream leak) + UnattributableEvents int64 // in-window rows with NULL auth_id/resource_id/model_id (upstream leak) AmbiguousBaseEvents int64 // events under an ft: rollup spanning >1 base_model (E3 violation) - RollupsWritten int64 // distinct (auth_id, model_id, hour) rows upserted + RollupsWritten int64 // distinct (auth_id, resource_id, model_id, hour) rows upserted ReconciledDeletions int64 // stale in-window rollups DELETED because this re-run no longer produces them TotalCost string // sum of all rollup costs, NUMERIC as text } @@ -55,7 +55,9 @@ type Result struct { func (r Result) HasUnpriced() bool { return r.UnpricedEvents > 0 } // HasUnattributable reports whether any in-window row was skipped for a NULL -// auth_id/model_id — like HasUnpriced, a loud, exit-nonzero outcome. +// auth_id/resource_id/model_id — like HasUnpriced, a loud, exit-nonzero outcome. A +// NULL resource_id means the row can't name its deployment/org (E2), so it can't be +// billed and is counted here rather than attributed to a NULL org. func (r Result) HasUnattributable() bool { return r.UnattributableEvents > 0 } // HasAmbiguousBase reports whether any ft: rollup spanned more than one base_model in a @@ -79,7 +81,7 @@ func (r Result) HasAnomaly() bool { // // FAIL-LOUD ON MISSING PRICE / UNATTRIBUTABLE (the fail-closed rule): an event // whose model has no resolvable price at its time, and a row with a NULL -// auth_id/model_id, are NOT summed into any rollup — the SQL excludes them. They +// auth_id/resource_id/model_id, are NOT summed into any rollup — the SQL excludes them. They // are COUNTED by the SAME statement that writes the rollups (one snapshot, so a // row the drainer commits mid-run can never be excluded-but-uncounted), logged // loudly (ERROR), and drive cmd/rater's exit-nonzero path. They never become $0 @@ -131,7 +133,7 @@ func (r *Rater) Run(ctx context.Context, windowStart, windowEnd time.Time, windo windowStart.Format(time.RFC3339), windowEnd.Format(time.RFC3339), res.AmbiguousBaseEvents) } if res.HasUnattributable() { - r.log.Error.Printf("rating: window [%s,%s) has %d UNATTRIBUTABLE billing_event rows (NULL auth_id/model_id) — these cannot be rated; the interceptor's billing gate should reject them before metering, so a nonzero count means revenue is leaking upstream", + r.log.Error.Printf("rating: window [%s,%s) has %d UNATTRIBUTABLE billing_event rows (NULL auth_id/resource_id/model_id — a NULL resource_id can't name the deployment/org for E2 billing) — these cannot be rated; the interceptor's billing gate should reject them before metering, so a nonzero count means revenue is leaking upstream", windowStart.Format(time.RFC3339), windowEnd.Format(time.RFC3339), res.UnattributableEvents) } if res.HasUnpriced() { @@ -162,7 +164,7 @@ func (r *Rater) Run(ctx context.Context, windowStart, windowEnd time.Time, windo } if res.HasAnomaly() { - r.log.Error.Printf("rating: window [%s,%s) rated %d events into %d rollups, total=%s USD; %d UNPRICED events dropped (backfill prices and re-rate), %d UNATTRIBUTABLE rows skipped (NULL auth_id/model_id — upstream billing-gate leak), %d AMBIGUOUS-BASE events dropped (ft: id spanning multiple base_models — fix base_model propagation and re-rate)", + r.log.Error.Printf("rating: window [%s,%s) rated %d events into %d rollups, total=%s USD; %d UNPRICED events dropped (backfill prices and re-rate), %d UNATTRIBUTABLE rows skipped (NULL auth_id/resource_id/model_id — upstream billing-gate leak), %d AMBIGUOUS-BASE events dropped (ft: id spanning multiple base_models — fix base_model propagation and re-rate)", windowStart.Format(time.RFC3339), windowEnd.Format(time.RFC3339), res.EventsRated, res.RollupsWritten, res.TotalCost, res.UnpricedEvents, res.UnattributableEvents, res.AmbiguousBaseEvents) } else { diff --git a/internal/rating/rater_test.go b/internal/rating/rater_test.go index c828d45..be75101 100644 --- a/internal/rating/rater_test.go +++ b/internal/rating/rater_test.go @@ -30,6 +30,7 @@ type oracleStore struct { type rollupKey struct { authID string + resourceID string modelID string windowStart time.Time } @@ -64,7 +65,7 @@ func (s *oracleStore) resolveWindow(start, end time.Time) (map[rollupKey]oracleR if e.At.Before(start) || !e.At.Before(end) { continue } - if e.AuthID == "" || e.ModelID == "" { + if e.AuthID == "" || e.ResourceID == "" || e.ModelID == "" { an.UnattributableEvents++ continue } @@ -80,7 +81,7 @@ func (s *oracleStore) resolveWindow(start, end time.Time) (map[rollupKey]oracleR rate := resolved.Quantized() cost := rateExact(e, rate) hour := e.At.UTC().Truncate(time.Hour) - k := rollupKey{authID: e.AuthID, modelID: e.ModelID, windowStart: hour} + k := rollupKey{authID: e.AuthID, resourceID: e.ResourceID, modelID: e.ModelID, windowStart: hour} ru := out[k] ru.prompt += e.PromptTokens ru.cached += e.CachedTokens @@ -166,8 +167,8 @@ func testLogger() *logging.Logger { return logging.New(logging.ERROR) } func TestRater_MultiEventAggregation(t *testing.T) { at := mustTime("2026-06-08T10:15:00Z") events := []RatedEvent{ - {AuthID: "a", ModelID: "m", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, - {AuthID: "a", ModelID: "m", PromptTokens: 10, CachedTokens: 0, CompletionTokens: 5, At: at.Add(20 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 10, CachedTokens: 0, CompletionTokens: 5, At: at.Add(20 * time.Minute)}, } store := newOracleStore(bookM(), events) r := New(store, bookM(), testLogger()) @@ -185,7 +186,7 @@ func TestRater_MultiEventAggregation(t *testing.T) { if res.TotalCost != "0.000799000" { t.Fatalf("total = %s, want 0.000799000", res.TotalCost) } - k := rollupKey{authID: "a", modelID: "m", windowStart: mustTime("2026-06-08T10:00:00Z")} + k := rollupKey{authID: "a", resourceID: "r", modelID: "m", windowStart: mustTime("2026-06-08T10:00:00Z")} ru := store.table[k] if ru.prompt != 110 || ru.cached != 30 || ru.completion != 55 || ru.billable != 80 { t.Fatalf("token sums = p%d c%d comp%d bill%d, want 110/30/55/80", ru.prompt, ru.cached, ru.completion, ru.billable) @@ -200,7 +201,7 @@ func TestRater_MultiEventAggregation(t *testing.T) { func TestRater_IdempotentRerunNoDoubling(t *testing.T) { at := mustTime("2026-06-08T10:15:00Z") events := []RatedEvent{ - {AuthID: "a", ModelID: "m", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, } store := newOracleStore(bookM(), events) r := New(store, bookM(), testLogger()) @@ -244,7 +245,7 @@ func TestRater_ReRateDeletesSupersededRollup(t *testing.T) { // Run A: a single-base ft: rollup — CLEAN, bills. store := newOracleStore(book, []RatedEvent{ - {AuthID: "a", ModelID: "ft:dupe", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "ft:dupe", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, }) r := New(store, book, testLogger()) resA, err := r.Run(context.Background(), ws, we, false) @@ -254,7 +255,7 @@ func TestRater_ReRateDeletesSupersededRollup(t *testing.T) { if resA.RollupsWritten != 1 || resA.ReconciledDeletions != 0 { t.Fatalf("run A: rollups=%d deletions=%d, want 1/0", resA.RollupsWritten, resA.ReconciledDeletions) } - dupeKey := rollupKey{authID: "a", modelID: "ft:dupe", windowStart: ws} + dupeKey := rollupKey{authID: "a", resourceID: "r", modelID: "ft:dupe", windowStart: ws} if _, ok := store.table[dupeKey]; !ok { t.Fatal("run A: clean ft:dupe rollup must exist before re-rate") } @@ -262,7 +263,7 @@ func TestRater_ReRateDeletesSupersededRollup(t *testing.T) { // Mutate data: a SECOND, distinct base_model arrives for the SAME ft: id in the same // window → ambiguous. Run B excludes it from priced. store.events = append(store.events, - RatedEvent{AuthID: "a", ModelID: "ft:dupe", BaseModel: "expensive/base", PromptTokens: 1000, At: at.Add(5 * time.Minute)}) + RatedEvent{AuthID: "a", ResourceID: "r", ModelID: "ft:dupe", BaseModel: "expensive/base", PromptTokens: 1000, At: at.Add(5 * time.Minute)}) resB, err := r.Run(context.Background(), ws, we, false) if err != nil { t.Fatal(err) @@ -299,8 +300,8 @@ func TestRater_ReRateSupersedesOneKeepsAnotherSameWindow(t *testing.T) { ws, we := mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z") // Run A: TWO clean single-base ft: rollups (ft:keep and ft:gone). store := newOracleStore(book, []RatedEvent{ - {AuthID: "a", ModelID: "ft:keep", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, - {AuthID: "a", ModelID: "ft:gone", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "ft:keep", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "ft:gone", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, }) r := New(store, book, testLogger()) if _, err := r.Run(context.Background(), ws, we, false); err != nil { @@ -311,7 +312,7 @@ func TestRater_ReRateSupersedesOneKeepsAnotherSameWindow(t *testing.T) { } // Run B: ft:gone gains a second base (ambiguous → superseded); ft:keep unchanged. store.events = append(store.events, - RatedEvent{AuthID: "a", ModelID: "ft:gone", BaseModel: "expensive/base", PromptTokens: 1000, At: at.Add(5 * time.Minute)}) + RatedEvent{AuthID: "a", ResourceID: "r", ModelID: "ft:gone", BaseModel: "expensive/base", PromptTokens: 1000, At: at.Add(5 * time.Minute)}) resB, err := r.Run(context.Background(), ws, we, false) if err != nil { t.Fatal(err) @@ -319,10 +320,10 @@ func TestRater_ReRateSupersedesOneKeepsAnotherSameWindow(t *testing.T) { if resB.ReconciledDeletions != 1 { t.Fatalf("run B: deletions = %d, want 1 (only ft:gone superseded)", resB.ReconciledDeletions) } - if _, gone := store.table[rollupKey{authID: "a", modelID: "ft:gone", windowStart: ws}]; gone { + if _, gone := store.table[rollupKey{authID: "a", resourceID: "r", modelID: "ft:gone", windowStart: ws}]; gone { t.Fatal("ft:gone rollup survived — it became ambiguous and must be deleted") } - if _, keep := store.table[rollupKey{authID: "a", modelID: "ft:keep", windowStart: ws}]; !keep { + if _, keep := store.table[rollupKey{authID: "a", resourceID: "r", modelID: "ft:keep", windowStart: ws}]; !keep { t.Fatal("ft:keep rollup was deleted — a surviving co-window rollup must be UPDATED, not clobbered") } if len(store.table) != 1 { @@ -338,7 +339,7 @@ func TestRater_ReRateDeletesVanishedRollup(t *testing.T) { at := mustTime("2026-06-08T10:15:00Z") ws, we := mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z") store := newOracleStore(bookM(), []RatedEvent{ - {AuthID: "a", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, }) r := New(store, bookM(), testLogger()) if _, err := r.Run(context.Background(), ws, we, false); err != nil { @@ -367,8 +368,8 @@ func TestRater_ReRateIdenticalDataIsNoOp(t *testing.T) { at := mustTime("2026-06-08T10:15:00Z") ws, we := mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z") store := newOracleStore(bookM(), []RatedEvent{ - {AuthID: "a", ModelID: "m", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, - {AuthID: "b", ModelID: "m", PromptTokens: 200, At: at.Add(10 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, + {AuthID: "b", ResourceID: "r", ModelID: "m", PromptTokens: 200, At: at.Add(10 * time.Minute)}, }) r := New(store, bookM(), testLogger()) @@ -394,7 +395,7 @@ func TestRater_ReRateIdenticalDataIsNoOp(t *testing.T) { // [ws,we): the reconcile's window predicate is half-open and hour-scoped. otherHour := mustTime("2026-06-08T12:00:00Z") store.events = append(store.events, - RatedEvent{AuthID: "a", ModelID: "m", PromptTokens: 5, At: otherHour.Add(5 * time.Minute)}) + RatedEvent{AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 5, At: otherHour.Add(5 * time.Minute)}) if _, err := r.Run(context.Background(), otherHour, otherHour.Add(time.Hour), false); err != nil { t.Fatal(err) } @@ -406,7 +407,7 @@ func TestRater_ReRateIdenticalDataIsNoOp(t *testing.T) { if resFirst.ReconciledDeletions != 0 { t.Fatalf("re-rate of [10:00,11:00) deleted %d rows, want 0 (must not touch the 12:00 rollup)", resFirst.ReconciledDeletions) } - otherKey := rollupKey{authID: "a", modelID: "m", windowStart: otherHour} + otherKey := rollupKey{authID: "a", resourceID: "r", modelID: "m", windowStart: otherHour} if _, ok := store.table[otherKey]; !ok { t.Fatal("a re-rate of the first window deleted an out-of-window (12:00) rollup — the window predicate leaked") } @@ -432,7 +433,7 @@ func supersededReconcileStore() (*oracleStore, time.Time, time.Time) { at := mustTime("2026-06-08T10:15:00Z") ws, we := mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z") store := newOracleStore(bookM(), []RatedEvent{ - {AuthID: "a", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, }) return store, ws, we } @@ -538,7 +539,7 @@ func TestRater_LateArrivalRatedByTrailingWindow(t *testing.T) { } store.events = append(store.events, RatedEvent{ - AuthID: "a", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: hourH.Add(15 * time.Minute), + AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: hourH.Add(15 * time.Minute), }) res2, err := r.Run(context.Background(), hourH.Add(-23*time.Hour), hourH.Add(2*time.Hour), false) @@ -548,7 +549,7 @@ func TestRater_LateArrivalRatedByTrailingWindow(t *testing.T) { if res2.EventsRated != 1 || res2.RollupsWritten != 1 { t.Fatalf("run2 rated=%d rollups=%d, want 1/1 (late event caught by trailing window)", res2.EventsRated, res2.RollupsWritten) } - k := rollupKey{authID: "a", modelID: "m", windowStart: hourH} + k := rollupKey{authID: "a", resourceID: "r", modelID: "m", windowStart: hourH} if _, ok := store.table[k]; !ok { t.Fatal("no rollup for hour H — the late-drained event was lost") } @@ -560,8 +561,8 @@ func TestRater_LateArrivalRatedByTrailingWindow(t *testing.T) { func TestRater_MissingPriceFailsLoudNotZero(t *testing.T) { at := mustTime("2026-06-08T10:15:00Z") events := []RatedEvent{ - {AuthID: "a", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // priced - {AuthID: "a", ModelID: "unpriced", PromptTokens: 100, CompletionTokens: 50, At: at}, // NO price + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // priced + {AuthID: "a", ResourceID: "r", ModelID: "unpriced", PromptTokens: 100, CompletionTokens: 50, At: at}, // NO price } store := newOracleStore(bookM(), events) r := New(store, bookM(), testLogger()) @@ -585,14 +586,17 @@ func TestRater_MissingPriceFailsLoudNotZero(t *testing.T) { } } -// TestRater_UnattributableCountedNotSilent: rows with NULL auth_id and/or model_id -// must NOT be rated, MUST be counted, and MUST trigger the loud anomaly / exit-2 path. +// TestRater_UnattributableCountedNotSilent: rows with NULL auth_id, NULL resource_id, +// and/or NULL model_id must NOT be rated, MUST be counted, and MUST trigger the loud +// anomaly / exit-2 path. resource_id joins the key columns (E2): a row that can't name +// its deployment/org is unattributable exactly like a NULL auth_id/model_id. func TestRater_UnattributableCountedNotSilent(t *testing.T) { at := mustTime("2026-06-08T10:15:00Z") events := []RatedEvent{ - {AuthID: "a", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // ok - {AuthID: "", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // NULL auth_id - {AuthID: "a", ModelID: "", PromptTokens: 100, CompletionTokens: 50, At: at}, // NULL model_id + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // ok + {AuthID: "", ResourceID: "r", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // NULL auth_id + {AuthID: "a", ResourceID: "", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // NULL resource_id + {AuthID: "a", ResourceID: "r", ModelID: "", PromptTokens: 100, CompletionTokens: 50, At: at}, // NULL model_id } store := newOracleStore(bookM(), events) r := New(store, bookM(), testLogger()) @@ -600,8 +604,8 @@ func TestRater_UnattributableCountedNotSilent(t *testing.T) { if err != nil { t.Fatal(err) } - if res.UnattributableEvents != 2 { - t.Fatalf("unattributable = %d, want 2", res.UnattributableEvents) + if res.UnattributableEvents != 3 { + t.Fatalf("unattributable = %d, want 3 (NULL auth_id, NULL resource_id, NULL model_id)", res.UnattributableEvents) } if res.EventsRated != 1 || res.RollupsWritten != 1 || len(store.table) != 1 { t.Fatalf("rated=%d rollups=%d table=%d, want 1/1/1", res.EventsRated, res.RollupsWritten, len(store.table)) @@ -611,6 +615,82 @@ func TestRater_UnattributableCountedNotSilent(t *testing.T) { } } +// TestRater_DistinctDeploymentsBillSeparately (E2 grain — the negative invariant): +// TWO deployments (distinct resource_id) of the SAME model by the SAME auth in the +// SAME hour produce TWO distinct rated_usage rows, NOT one summed row. resource_id is +// part of the grain precisely because the two deployments may bill to different orgs +// (resource_id→org_id), so collapsing them would mis-attribute revenue. The two rows +// share (auth_id, model_id, window_start) and differ ONLY in resource_id. +func TestRater_DistinctDeploymentsBillSeparately(t *testing.T) { + at := mustTime("2026-06-08T10:15:00Z") + ws := mustTime("2026-06-08T10:00:00Z") + events := []RatedEvent{ + {AuthID: "a", ResourceID: "deploy-1", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, + {AuthID: "a", ResourceID: "deploy-2", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at.Add(5 * time.Minute)}, + } + store := newOracleStore(bookM(), events) + r := New(store, bookM(), testLogger()) + res, err := r.Run(context.Background(), ws, mustTime("2026-06-08T11:00:00Z"), false) + if err != nil { + t.Fatal(err) + } + // TWO rollups, one per deployment — NOT one summed row. + if res.EventsRated != 2 || res.RollupsWritten != 2 || len(store.table) != 2 { + t.Fatalf("rated=%d rollups=%d table=%d, want 2/2/2 (distinct resource_id must NOT collapse into one row)", + res.EventsRated, res.RollupsWritten, len(store.table)) + } + // Each deployment has its OWN row under the same (auth, model, hour). + for _, rid := range []string{"deploy-1", "deploy-2"} { + k := rollupKey{authID: "a", resourceID: rid, modelID: "m", windowStart: ws} + ru, ok := store.table[k] + if !ok { + t.Fatalf("no rollup for deployment %q — distinct deployments must each bill", rid) + } + if ru.eventCount != 1 { + t.Fatalf("deployment %q event_count = %d, want 1 (one event per deployment)", rid, ru.eventCount) + } + } +} + +// TestRater_NullResourceIdIsUnattributable (fail-closed attribution): an event with a +// NULL resource_id CANNOT name its deployment/org (E2 resolves the org via +// resource_id→org_id), so it must be counted UNATTRIBUTABLE and EXCLUDED from billing — +// never $0-billed, never billed to a NULL org. It pins the partition invariant with +// resource_id in the mix: rated + unpriced + unattributable + ambiguous == total. +func TestRater_NullResourceIdIsUnattributable(t *testing.T) { + at := mustTime("2026-06-08T10:15:00Z") + events := []RatedEvent{ + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // attributable + {AuthID: "a", ResourceID: "", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // NULL resource_id → unattributable + } + store := newOracleStore(bookM(), events) + r := New(store, bookM(), testLogger()) + res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z"), false) + if err != nil { + t.Fatal(err) + } + if res.UnattributableEvents != 1 { + t.Fatalf("unattributable = %d, want 1 (the NULL-resource_id event must be counted, never billed)", res.UnattributableEvents) + } + if res.EventsRated != 1 || res.RollupsWritten != 1 || len(store.table) != 1 { + t.Fatalf("rated=%d rollups=%d table=%d, want 1/1/1 (no row for the unattributable event)", res.EventsRated, res.RollupsWritten, len(store.table)) + } + // No rollup carries an empty resource_id — the fail-closed row is never written. + for k := range store.table { + if k.resourceID == "" { + t.Fatal("a rollup exists with an empty resource_id — a row that can't name its deployment/org must NEVER be billed") + } + } + if !res.HasUnattributable() || !res.HasAnomaly() { + t.Fatal("HasUnattributable/HasAnomaly = false, want true (NULL resource_id must drive exit-nonzero)") + } + // PARTITION with resource_id in the mix. + if got := res.EventsRated + res.UnpricedEvents + res.UnattributableEvents + res.AmbiguousBaseEvents; got != int64(len(events)) { + t.Fatalf("rated(%d)+unpriced(%d)+unattr(%d)+ambiguous(%d) = %d, want %d", + res.EventsRated, res.UnpricedEvents, res.UnattributableEvents, res.AmbiguousBaseEvents, got, len(events)) + } +} + // TestRater_DerivedFineTuneRatedViaPolicy: end-to-end, a fine-tune (derived_from a // base) is rated at base × premium — exercising derivation in the run. func TestRater_DerivedFineTuneRatedViaPolicy(t *testing.T) { @@ -620,7 +700,7 @@ func TestRater_DerivedFineTuneRatedViaPolicy(t *testing.T) { PolicyMultiplier, MustDec("1.5"), Dec{}, ) events := []RatedEvent{ - {AuthID: "a", ModelID: "ft", PromptTokens: 1000, At: mustTime("2026-06-08T10:15:00Z")}, + {AuthID: "a", ResourceID: "r", ModelID: "ft", PromptTokens: 1000, At: mustTime("2026-06-08T10:15:00Z")}, } store := newOracleStore(book, events) r := New(store, book, testLogger()) @@ -646,7 +726,7 @@ func TestRater_FineTunePricesViaBaseModelOnEvent(t *testing.T) { PolicyMultiplier, MustDec("1.5"), Dec{}, ) events := []RatedEvent{{ - AuthID: "a", + AuthID: "a", ResourceID: "r", ModelID: "ft:9f8e7d6c5b4a", // a checkpoint id the file does not list BaseModel: "meta-llama/Llama-3.1-8B-Instruct", PromptTokens: 1000, @@ -667,7 +747,7 @@ func TestRater_FineTunePricesViaBaseModelOnEvent(t *testing.T) { } // The applied rate frozen on the row is the derived (base × premium) rate. hour := mustTime("2026-06-08T10:00:00Z") - row := store.table[rollupKey{"a", "ft:9f8e7d6c5b4a", hour}] + row := store.table[rollupKey{"a", "r", "ft:9f8e7d6c5b4a", hour}] if row.appliedRate.Prompt.String() != "0.000006000" { t.Fatalf("ft applied prompt rate = %s, want 0.000006000 (0.000004 × 1.5)", row.appliedRate.Prompt) } @@ -685,7 +765,7 @@ func TestRater_FineTuneWithoutBaseModelFailsLoud(t *testing.T) { nil, PolicyMultiplier, MustDec("1.5"), Dec{}, ) events := []RatedEvent{{ - AuthID: "a", + AuthID: "a", ResourceID: "r", ModelID: "ft:9f8e7d6c5b4a", BaseModel: "", // THE BUG: base_model never propagated to the event PromptTokens: 1000, @@ -727,14 +807,14 @@ func TestRater_FineTuneAmbiguousBaseModelFailsLoud(t *testing.T) { at := mustTime("2026-06-08T10:15:00Z") events := []RatedEvent{ // SAME ft: model_id, TWO different base_models in the same window → ambiguous. - {AuthID: "a", ModelID: "ft:dupe", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, - {AuthID: "a", ModelID: "ft:dupe", BaseModel: "expensive/base", PromptTokens: 1000, At: at.Add(5 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "ft:dupe", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "ft:dupe", BaseModel: "expensive/base", PromptTokens: 1000, At: at.Add(5 * time.Minute)}, // THE LEGITIMATE CASE the gate must NOT trip: the same ft: id with the SAME base // across MULTIPLE events is one clean rollup (COUNT(DISTINCT base_model)=1), and // MUST still rate normally alongside the ambiguous one. Two events prove the gate // keys on DISTINCT bases, not on event count. - {AuthID: "a", ModelID: "ft:clean", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, - {AuthID: "a", ModelID: "ft:clean", BaseModel: "cheap/base", PromptTokens: 1000, At: at.Add(7 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "ft:clean", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "ft:clean", BaseModel: "cheap/base", PromptTokens: 1000, At: at.Add(7 * time.Minute)}, } store := newOracleStore(book, events) r := New(store, book, testLogger()) @@ -790,9 +870,9 @@ func TestRater_RollupCostSelfAudits(t *testing.T) { ) at := mustTime("2026-06-08T10:15:00Z") events := []RatedEvent{ - {AuthID: "a", ModelID: "base", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, - {AuthID: "a", ModelID: "base", PromptTokens: 200, CachedTokens: 0, CompletionTokens: 10, At: at.Add(10 * time.Minute)}, - {AuthID: "a", ModelID: "ft", PromptTokens: 100, CachedTokens: 0, CompletionTokens: 0, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "base", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "base", PromptTokens: 200, CachedTokens: 0, CompletionTokens: 10, At: at.Add(10 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "ft", PromptTokens: 100, CachedTokens: 0, CompletionTokens: 0, At: at}, } store := newOracleStore(book, events) r := New(store, book, testLogger()) @@ -824,8 +904,8 @@ func TestRater_AppliedRateStoredOnRow(t *testing.T) { ) at := mustTime("2026-06-08T10:15:00Z") events := []RatedEvent{ - {AuthID: "a", ModelID: "base", PromptTokens: 10, At: at}, - {AuthID: "a", ModelID: "ft", PromptTokens: 10, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "base", PromptTokens: 10, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "ft", PromptTokens: 10, At: at}, } store := newOracleStore(book, events) r := New(store, book, testLogger()) @@ -833,11 +913,11 @@ func TestRater_AppliedRateStoredOnRow(t *testing.T) { t.Fatal(err) } hour := mustTime("2026-06-08T10:00:00Z") - baseRow := store.table[rollupKey{"a", "base", hour}] + baseRow := store.table[rollupKey{"a", "r", "base", hour}] if baseRow.appliedRate.Prompt.String() != "0.000004000" { t.Fatalf("base applied prompt rate = %s, want 0.000004000", baseRow.appliedRate.Prompt) } - ftRow := store.table[rollupKey{"a", "ft", hour}] + ftRow := store.table[rollupKey{"a", "r", "ft", hour}] // premium applied: 0.000004 × 1.5 = 0.000006 if ftRow.appliedRate.Prompt.String() != "0.000006000" { t.Fatalf("ft applied prompt rate = %s, want 0.000006000 (premium frozen on row)", ftRow.appliedRate.Prompt) @@ -869,12 +949,12 @@ func TestOracleModel_SelfConsistent(t *testing.T) { ) hour := mustTime("2026-06-08T10:00:00Z") events := []RatedEvent{ - {AuthID: "a", ModelID: "b", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: hour.Add(5 * time.Minute)}, - {AuthID: "a", ModelID: "b", PromptTokens: 200, CachedTokens: 0, CompletionTokens: 10, At: hour.Add(40 * time.Minute)}, - {AuthID: "a", ModelID: "f", PromptTokens: 100, CachedTokens: 0, CompletionTokens: 0, At: hour.Add(15 * time.Minute)}, - {AuthID: "b", ModelID: "b", PromptTokens: 1000, CachedTokens: 1000, CompletionTokens: 0, At: hour.Add(20 * time.Minute)}, // all-cached - {AuthID: "a", ModelID: "unpriced", PromptTokens: 9, At: hour.Add(1 * time.Minute)}, // unpriced - {AuthID: "", ModelID: "b", PromptTokens: 9, At: hour.Add(2 * time.Minute)}, // unattributable + {AuthID: "a", ResourceID: "r", ModelID: "b", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: hour.Add(5 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "b", PromptTokens: 200, CachedTokens: 0, CompletionTokens: 10, At: hour.Add(40 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "f", PromptTokens: 100, CachedTokens: 0, CompletionTokens: 0, At: hour.Add(15 * time.Minute)}, + {AuthID: "b", ResourceID: "r", ModelID: "b", PromptTokens: 1000, CachedTokens: 1000, CompletionTokens: 0, At: hour.Add(20 * time.Minute)}, // all-cached + {AuthID: "a", ResourceID: "r", ModelID: "unpriced", PromptTokens: 9, At: hour.Add(1 * time.Minute)}, // unpriced + {AuthID: "", ModelID: "b", PromptTokens: 9, At: hour.Add(2 * time.Minute)}, // unattributable } store := newOracleStore(book, events) r := New(store, book, testLogger()) @@ -909,7 +989,7 @@ func TestOracleModel_SelfConsistent(t *testing.T) { t.Fatalf("rollups = %d, want %d", res.RollupsWritten, len(want)) } for k, w := range want { - rk := rollupKey{authID: k.auth, modelID: k.model, windowStart: hour} + rk := rollupKey{authID: k.auth, resourceID: "r", modelID: k.model, windowStart: hour} got := store.table[rk].cost if got.String() != w.String() { t.Errorf("rollup (%s,%s) cost = %s, oracle wants %s", k.auth, k.model, got, w) diff --git a/internal/rating/store.go b/internal/rating/store.go index e0daef0..9f6d133 100644 --- a/internal/rating/store.go +++ b/internal/rating/store.go @@ -21,7 +21,7 @@ import ( // store PROJECTS the book's already-premium-applied per-token rates into a // transient (TEMP) price table for the window, then rates the whole window in SQL — // resolves the effective rate, computes per-event cost, sums it per (auth_id, -// model_id, hour) into rated_usage idempotently, AND counts the fail-loud anomalies, +// resource_id, model_id, hour) into rated_usage idempotently, AND counts the fail-loud anomalies, // all in one snapshot so the rollups and the anomaly counts always agree on what // "priced" means. type Store interface { @@ -197,9 +197,9 @@ CREATE TEMP TABLE rating_derived ( // so rollup keys can never disagree across sessions and re-rates can't overlap. // // IDEMPOTENCY IS RECONCILE, NOT UPSERT-ONLY: a re-run of a window makes rated_usage -// EXACTLY what the latest run says. ON CONFLICT (auth_id, model_id, window_start) DO -// UPDATE replaces a surviving rollup's sums/cost/applied-rates with the freshly -// recomputed ones (the surrogate id is DETERMINISTIC — md5 of the LENGTH-PREFIXED +// EXACTLY what the latest run says. ON CONFLICT (auth_id, resource_id, model_id, +// window_start) DO UPDATE replaces a surviving rollup's sums/cost/applied-rates with +// the freshly recomputed ones (the surrogate id is DETERMINISTIC — md5 of the LENGTH-PREFIXED // natural key, injective, so no '|' in a field can collide two keys — so a re-run // regenerates the SAME id). AND the `deleted` CTE removes any in-window rated_usage // row this run did NOT reproduce in priced (it became ambiguous/unpriced, or its @@ -218,6 +218,11 @@ const rateWindowSQL = ` WITH ev AS ( SELECT auth_id, + -- resource_id: the deployment id (E2 customer attribution — billing resolves + -- the org via resource_id→org_id). It is part of the rated_usage grain: a NULL + -- resource_id row CANNOT name its deployment/org, so it is unattributable + -- (counted, never billed to a NULL org) — see the unattributable filter below. + resource_id, -- billing_event stores the engine-reported model NAME in its model column; -- that name IS phoebe's stable price key, model_id. A NULL model is -- unattributable. @@ -236,6 +241,7 @@ WITH ev AS ( resolved AS ( SELECT ev.auth_id, + ev.resource_id, ev.model_id, ev.base_model, ev.ev_ts, @@ -271,7 +277,8 @@ resolved AS ( -- fine-tune — never a literal 'ft:%' that could drift from the constant. AND ev.model_id LIKE $3 ), --- grouped: the per-(auth_id, model_id, hour) rollup BEFORE the ft-uniqueness gate. +-- grouped: the per-(auth_id, resource_id, model_id, hour) rollup BEFORE the +-- ft-uniqueness gate. -- It carries ambiguous_base = COUNT(DISTINCT base_model among DERIVED-priced rows) > 1. -- E3 mints ft: as a globally-unique uuid4, so one ft: model_id -- can NEVER legitimately carry two different base_models. If it does, the derived rates @@ -282,6 +289,7 @@ resolved AS ( grouped AS ( SELECT auth_id, + resource_id, model_id, -- Session-TZ-independent hour bucket; see the statement comment. date_trunc('hour', ev_ts AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS window_start, @@ -310,16 +318,20 @@ grouped AS ( COUNT(DISTINCT base_model) FILTER (WHERE via_derived) > 1 AS ambiguous_base FROM resolved WHERE prompt_price IS NOT NULL -- priced only - AND auth_id IS NOT NULL -- attributable only - AND model_id IS NOT NULL - GROUP BY auth_id, model_id, date_trunc('hour', ev_ts AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' + AND auth_id IS NOT NULL -- attributable only + AND model_id IS NOT NULL + -- resource_id is a NON-NULL key column AND the E2 customer-attribution key. A + -- NULL resource_id row can't name its deployment/org, so it is NEVER billed; it + -- is excluded here and COUNTED as unattributable below (fail closed). + AND resource_id IS NOT NULL + GROUP BY auth_id, resource_id, model_id, date_trunc('hour', ev_ts AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' ), priced AS ( SELECT * FROM grouped WHERE NOT ambiguous_base ), -- RECONCILE (re-rate deletes superseded rollups): a rated_usage row whose --- (auth_id, model_id, window_start) falls IN this run's window but is NOT in the --- current priced set is STALE — it billed CLEAN in a prior run, but the latest run +-- (auth_id, resource_id, model_id, window_start) falls IN this run's window but is +-- NOT in the current priced set is STALE — it billed CLEAN in a prior run, but the latest run -- now excludes it (it became ambiguous-base, or unpriced, or its events vanished). -- "What the latest run says is what bills," so it is DELETED, atomically with the -- upsert below, in the SAME snapshot. Without this, an upsert-only re-run leaves the @@ -340,6 +352,7 @@ deleted AS ( AND NOT EXISTS ( SELECT 1 FROM priced p WHERE p.auth_id = ru.auth_id + AND p.resource_id = ru.resource_id AND p.model_id = ru.model_id AND p.window_start = ru.window_start ) @@ -347,7 +360,7 @@ deleted AS ( ), upserted AS ( INSERT INTO rated_usage ( - id, auth_id, model_id, window_start, window_end, + id, auth_id, resource_id, model_id, window_start, window_end, prompt_tokens, cached_tokens, completion_tokens, billable_prompt_tokens, cost, applied_prompt_rate, applied_cached_rate, applied_completion_rate, event_count @@ -355,14 +368,17 @@ upserted AS ( SELECT -- DETERMINISTIC 32-char hex surrogate: md5 of the natural key, so re-rating -- regenerates the SAME id. The fields are LENGTH-PREFIXED (len || ':' || value) - -- so the encoding is INJECTIVE — a '|' inside auth_id or model_id can never - -- shift the boundary and collide two different keys onto one id (e.g. auth 'a|b' - -- + model 'c' vs auth 'a' + model 'b|c'). epoch (a bounded integer, no - -- separator hazard) keeps the hash input session-TZ-independent. + -- so the encoding is INJECTIVE — a '|' inside auth_id, resource_id or model_id + -- can never shift the boundary and collide two different keys onto one id (e.g. + -- auth 'a|b' + resource 'c' vs auth 'a' + resource 'b|c'). The field ORDER is + -- FIXED and MUST equal the unique key (auth_id, resource_id, model_id, + -- window_start); epoch (a bounded integer, no separator hazard) keeps the hash + -- input session-TZ-independent. md5(length(auth_id)::text || ':' || auth_id + || '|' || length(resource_id)::text || ':' || resource_id || '|' || length(model_id)::text || ':' || model_id || '|' || extract(epoch FROM window_start)::bigint::text), - auth_id, model_id, window_start, window_end, + auth_id, resource_id, model_id, window_start, window_end, prompt_tokens, cached_tokens, completion_tokens, billable_prompt_tokens, cost, applied_prompt_rate, applied_cached_rate, applied_completion_rate, event_count @@ -374,8 +390,8 @@ upserted AS ( -- raters over overlapping windows (Atlas CronJob concurrencyPolicy: Forbid), so -- the cross-rater hazard is unreachable and no delete-lock-ordering machinery is -- added here. See cmd/rater's package doc for the single-flight contract. - ORDER BY auth_id, model_id, window_start - ON CONFLICT (auth_id, model_id, window_start) DO UPDATE SET + ORDER BY auth_id, resource_id, model_id, window_start + ON CONFLICT (auth_id, resource_id, model_id, window_start) DO UPDATE SET window_end = EXCLUDED.window_end, prompt_tokens = EXCLUDED.prompt_tokens, cached_tokens = EXCLUDED.cached_tokens, @@ -401,12 +417,16 @@ SELECT -- counted ONLY as unattributable (the more specific signal), never also as -- unpriced, so the counts partition the in-window rows: -- events_rated + unpriced + unattributable + ambiguous_base == total in-window events. + -- The unpriced count requires FULL attribution (auth_id, resource_id, model_id all + -- NON-NULL) for exactly this exclusivity: a NULL-resource_id row that is also + -- unpriced must be counted ONLY as unattributable, never double-counted here. (SELECT COUNT(*)::bigint FROM resolved - WHERE prompt_price IS NULL - AND auth_id IS NOT NULL - AND model_id IS NOT NULL) AS unpriced_events, + WHERE prompt_price IS NULL + AND auth_id IS NOT NULL + AND resource_id IS NOT NULL + AND model_id IS NOT NULL) AS unpriced_events, (SELECT COUNT(*)::bigint FROM ev - WHERE auth_id IS NULL OR model_id IS NULL) AS unattributable_events, + WHERE auth_id IS NULL OR resource_id IS NULL OR model_id IS NULL) AS unattributable_events, -- AMBIGUOUS-BASE events: the EVENT count under ambiguous rollups (a single ft: -- model_id resolving through >1 distinct base_model in one window — the E3 -- ft-uniqueness violation). These rollups are NOT upserted (excluded from priced), diff --git a/internal/rating/store_integration_test.go b/internal/rating/store_integration_test.go index 0cf7a62..79db256 100644 --- a/internal/rating/store_integration_test.go +++ b/internal/rating/store_integration_test.go @@ -34,6 +34,9 @@ const schemaDDL = ` CREATE TABLE billing_event ( request_id VARCHAR(255) PRIMARY KEY, auth_id VARCHAR(64), + -- resource_id (the deployment id) is NULLABLE here, mirroring migration 0001: the + -- rater fails closed on a NULL (counts it unattributable), it does not reject it. + resource_id VARCHAR(64), model VARCHAR(255), base_model VARCHAR(255), prompt_tokens INTEGER NOT NULL DEFAULT 0, @@ -49,6 +52,7 @@ CREATE INDEX billing_event_rating_instant_ix CREATE TABLE rated_usage ( id VARCHAR(32) PRIMARY KEY, auth_id VARCHAR(64) NOT NULL, + resource_id VARCHAR(64) NOT NULL, model_id VARCHAR(255) NOT NULL, window_start TIMESTAMPTZ NOT NULL, window_end TIMESTAMPTZ NOT NULL, @@ -62,13 +66,15 @@ CREATE TABLE rated_usage ( applied_completion_rate NUMERIC(20,9) NOT NULL DEFAULT 0, event_count BIGINT NOT NULL, rated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT rated_usage_auth_model_window_uq UNIQUE (auth_id, model_id, window_start) + CONSTRAINT rated_usage_auth_resource_model_window_uq UNIQUE (auth_id, resource_id, model_id, window_start) ); -- Mirror the production indexes (migrations/0002_rating.sql): the auth-leading index --- for billing queries, and the window_start-leading index the reconcile DELETE needs --- (it filters window_start alone; every auth-leading index leaves it trailing). +-- for billing queries, the resource_id-leading index for E2 per-deployment reads, and +-- the window_start-leading index the reconcile DELETE needs (it filters window_start +-- alone; every auth/resource-leading index leaves it trailing). CREATE INDEX rated_usage_auth_id_window_start_ix ON rated_usage (auth_id, window_start); +CREATE INDEX rated_usage_resource_id_window_start_ix ON rated_usage (resource_id, window_start); CREATE INDEX rated_usage_window_start_ix ON rated_usage (window_start);` // conformanceBook is the fixture price book shared by the conformance tests: base @@ -103,18 +109,19 @@ func TestIntegration_RateWindow_ConformsToOracle(t *testing.T) { hour := mustTime("2026-06-08T10:00:00Z") book := conformanceBook() - // Events: priced base, priced derived, unpriced, unattributable. + // Events: priced base, priced derived, unpriced, unattributable. Each priced/unpriced + // event carries a resource_id (E2 grain); the unattributable one has none. events := []RatedEvent{ - {AuthID: "a", ModelID: "b", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: hour.Add(5 * time.Minute)}, - {AuthID: "a", ModelID: "f", PromptTokens: 100, CachedTokens: 0, CompletionTokens: 0, At: hour.Add(15 * time.Minute)}, - {AuthID: "a", ModelID: "unpriced", PromptTokens: 9, At: hour.Add(1 * time.Minute)}, - {AuthID: "", ModelID: "b", PromptTokens: 9, At: hour.Add(2 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "b", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: hour.Add(5 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "f", PromptTokens: 100, CachedTokens: 0, CompletionTokens: 0, At: hour.Add(15 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "unpriced", PromptTokens: 9, At: hour.Add(1 * time.Minute)}, + {AuthID: "", ResourceID: "r", ModelID: "b", PromptTokens: 9, At: hour.Add(2 * time.Minute)}, } for i, e := range events { _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) - VALUES ($1,$2,$3,$4,$5,$6,$7)`, - fmt.Sprintf("req-%d", i), nullableStr(e.AuthID), nullableStr(e.ModelID), + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8)`, + fmt.Sprintf("req-%d", i), nullableStr(e.AuthID), nullableStr(e.ResourceID), nullableStr(e.ModelID), e.PromptTokens, e.CachedTokens, e.CompletionTokens, e.At) if err != nil { t.Fatalf("seed event %d: %v", i, err) @@ -177,6 +184,18 @@ func TestIntegration_RateWindow_ConformsToOracle(t *testing.T) { } } + // E2 ATTRIBUTION: every written rollup carries the event's resource_id (the + // deployment id billing resolves the org from). The two priced rollups were seeded + // with resource_id 'r'. + var nWithResource, nTotal int + if err := db.QueryRowContext(ctx, `SELECT COUNT(*) FILTER (WHERE resource_id = 'r'), COUNT(*) FROM rated_usage`). + Scan(&nWithResource, &nTotal); err != nil { + t.Fatalf("read resource_id: %v", err) + } + if nWithResource != nTotal || nTotal != 2 { + t.Fatalf("rated_usage resource_id: %d of %d rows carry 'r', want all 2 (E2 attribution must be on every row)", nWithResource, nTotal) + } + // Deterministic surrogate ids: capture before the re-run. idsBefore := readRatedUsageIDs(t, db) @@ -200,10 +219,97 @@ func TestIntegration_RateWindow_ConformsToOracle(t *testing.T) { } } +// TestIntegration_ResourceIDGrainAndFailClosed runs the REAL SQL to pin the E2 +// resource_id grain in Postgres: +// - TWO deployments (distinct resource_id) of the SAME model by the SAME auth in the +// SAME hour produce TWO distinct rated_usage rows (NOT one summed row) — they may +// bill to different orgs, so collapsing them would mis-attribute revenue; +// - a NULL-resource_id event is UNATTRIBUTABLE: counted, never written (a row that +// can't name its deployment/org must never be billed). This is the live-Postgres +// proof of the fail-closed attribution partition. +func TestIntegration_ResourceIDGrainAndFailClosed(t *testing.T) { + dsn := os.Getenv("PHOEBE_TEST_DATABASE_URL") + if dsn == "" { + t.Skip("PHOEBE_TEST_DATABASE_URL not set; skipping live-Postgres conformance") + } + ctx := context.Background() + db, err := sql.Open("pgx", dsn) + if err != nil { + t.Fatalf("open: %v", err) + } + defer db.Close() + + const sch = "phoebe_rating_resource_it" + exec(t, db, "DROP SCHEMA IF EXISTS "+sch+" CASCADE") + exec(t, db, "CREATE SCHEMA "+sch) + exec(t, db, "SET search_path TO "+sch) + defer func() { exec(t, db, "DROP SCHEMA IF EXISTS "+sch+" CASCADE") }() + exec(t, db, schemaDDL) + + hour := mustTime("2026-06-08T10:00:00Z") + book := newTestBook( + map[string]Rate3{"b": rate3("0.000005", "0", "0")}, + nil, PolicyIdentity, Dec{}, Dec{}, + ) + + // Two deployments of model "b" by auth "a" in the same hour, plus a NULL-resource_id + // event that must fail closed. + if _, err := db.ExecContext(ctx, + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, prompt_tokens, completion_tokens, event_ts) + VALUES ('d1','a','deploy-1','b',100,0,$1), + ('d2','a','deploy-2','b',100,0,$1), + ('dnull','a',NULL,'b',100,0,$1)`, hour.Add(5*time.Minute)); err != nil { + t.Fatalf("seed: %v", err) + } + + store := NewPostgresStore(db) + res, err := store.RateWindow(ctx, book, hour, hour.Add(time.Hour)) + if err != nil { + t.Fatalf("RateWindow: %v", err) + } + + // Two deployments → two rollups; the NULL-resource_id event is unattributable. + if res.RollupsWritten != 2 || res.EventsRated != 2 { + t.Fatalf("rollups/events = %d/%d, want 2/2 (distinct deployments bill separately)", res.RollupsWritten, res.EventsRated) + } + if res.UnattributableEvents != 1 { + t.Fatalf("unattributable = %d, want 1 (the NULL-resource_id event must be counted, never billed)", res.UnattributableEvents) + } + // PARTITION holds with resource_id in the mix. + if got := res.EventsRated + res.UnpricedEvents + res.UnattributableEvents + res.AmbiguousBaseEvents; got != 3 { + t.Fatalf("rated+unpriced+unattr+ambiguous = %d, want 3 (all seeded events)", got) + } + + // Exactly two rows, one per deployment; NONE with a NULL/empty resource_id. + var nRows, nNull int + if err := db.QueryRowContext(ctx, + `SELECT COUNT(*), COUNT(*) FILTER (WHERE resource_id IS NULL OR resource_id = '') FROM rated_usage`). + Scan(&nRows, &nNull); err != nil { + t.Fatalf("count rows: %v", err) + } + if nRows != 2 { + t.Fatalf("rated_usage rows = %d, want 2 (one per deployment)", nRows) + } + if nNull != 0 { + t.Fatalf("rated_usage has %d NULL/empty-resource_id rows — a row that can't name its deployment/org must NEVER be written", nNull) + } + for _, rid := range []string{"deploy-1", "deploy-2"} { + var n int + if err := db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM rated_usage WHERE auth_id='a' AND resource_id=$1 AND model_id='b' AND window_start=$2`, + rid, hour).Scan(&n); err != nil { + t.Fatalf("count %s: %v", rid, err) + } + if n != 1 { + t.Fatalf("deployment %s rollups = %d, want exactly 1", rid, n) + } + } +} + // readRatedUsageIDs returns natural-key → id for every rated_usage row. func readRatedUsageIDs(t *testing.T, db *sql.DB) map[string]string { t.Helper() - rows, err := db.Query(`SELECT auth_id || '|' || model_id || '|' || extract(epoch FROM window_start)::bigint::text, id FROM rated_usage`) + rows, err := db.Query(`SELECT auth_id || '|' || resource_id || '|' || model_id || '|' || extract(epoch FROM window_start)::bigint::text, id FROM rated_usage`) if err != nil { t.Fatalf("read rated_usage ids: %v", err) } @@ -271,8 +377,8 @@ func TestConformance_PremiumQuantizedBeforeBilling(t *testing.T) { } for i, e := range events { if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) - VALUES ($1,$2,$3,$4,$5,$6,$7)`, + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) + VALUES ($1,$2,'r',$3,$4,$5,$6,$7)`, fmt.Sprintf("q-req-%d", i), e.AuthID, e.ModelID, e.PromptTokens, e.CachedTokens, e.CompletionTokens, e.At); err != nil { t.Fatalf("seed event %d: %v", i, err) @@ -361,8 +467,8 @@ func TestConformance_OracleQuantizesBeforeMultiply_OnResidue(t *testing.T) { } for i, e := range events { if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) - VALUES ($1,$2,$3,$4,$5,$6,$7)`, + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) + VALUES ($1,$2,'r',$3,$4,$5,$6,$7)`, fmt.Sprintf("res-req-%d", i), e.AuthID, e.ModelID, e.PromptTokens, e.CachedTokens, e.CompletionTokens, e.At); err != nil { t.Fatalf("seed event %d: %v", i, err) @@ -458,8 +564,8 @@ func TestIntegration_FineTunePricesViaBaseModel(t *testing.T) { } for _, s := range seeds { if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, base_model, prompt_tokens, completion_tokens, event_ts) - VALUES ($1,'a',$2,$3,$4,0,$5)`, + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, base_model, prompt_tokens, completion_tokens, event_ts) + VALUES ($1,'a','r',$2,$3,$4,0,$5)`, s.req, s.model, nullableStr(s.baseModel), s.prompt, hour.Add(5*time.Minute)); err != nil { t.Fatalf("seed %s: %v", s.req, err) } @@ -553,8 +659,8 @@ func TestIntegration_FineTuneAmbiguousBaseModelFailsLoud(t *testing.T) { } for _, s := range seeds { if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, base_model, prompt_tokens, completion_tokens, event_ts) - VALUES ($1,'a',$2,$3,$4,0,$5)`, + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, base_model, prompt_tokens, completion_tokens, event_ts) + VALUES ($1,'a','r',$2,$3,$4,0,$5)`, s.req, s.model, nullableStr(s.baseModel), s.prompt, hour.Add(5*time.Minute)); err != nil { t.Fatalf("seed %s: %v", s.req, err) } @@ -632,9 +738,9 @@ func TestIntegration_ReRateReconciles(t *testing.T) { // Run A: a single-base ft: rollup (ft:dupe) PLUS a co-window survivor (ft:keep). // Both clean → billed. ft:keep must survive run B's reconcile untouched. if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, base_model, prompt_tokens, completion_tokens, event_ts) - VALUES ('rc-1','a','ft:dupe','cheap/base',1000,0,$1), - ('rc-keep','a','ft:keep','cheap/base',1000,0,$1)`, hour.Add(5*time.Minute)); err != nil { + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, base_model, prompt_tokens, completion_tokens, event_ts) + VALUES ('rc-1','a','r','ft:dupe','cheap/base',1000,0,$1), + ('rc-keep','a','r','ft:keep','cheap/base',1000,0,$1)`, hour.Add(5*time.Minute)); err != nil { t.Fatalf("seed rc-1/rc-keep: %v", err) } resA, err := store.RateWindow(ctx, book, hour, hour.Add(time.Hour)) @@ -654,8 +760,8 @@ func TestIntegration_ReRateReconciles(t *testing.T) { // Mutate: a SECOND base_model for the SAME ft: id → ambiguous. if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, base_model, prompt_tokens, completion_tokens, event_ts) - VALUES ('rc-2','a','ft:dupe','expensive/base',1000,0,$1)`, hour.Add(10*time.Minute)); err != nil { + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, base_model, prompt_tokens, completion_tokens, event_ts) + VALUES ('rc-2','a','r','ft:dupe','expensive/base',1000,0,$1)`, hour.Add(10*time.Minute)); err != nil { t.Fatalf("seed rc-2: %v", err) } resB, err := store.RateWindow(ctx, book, hour, hour.Add(time.Hour)) @@ -729,8 +835,8 @@ func TestIntegration_ReRateReconcileLeavesOtherWindowsUntouched(t *testing.T) { // Seed a clean rollup in BOTH the 10:00 and 12:00 hours; rate each window. if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, prompt_tokens, completion_tokens, event_ts) - VALUES ('w10','a','b',100,0,$1), ('w12','a','b',100,0,$2)`, + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, prompt_tokens, completion_tokens, event_ts) + VALUES ('w10','a','r','b',100,0,$1), ('w12','a','r','b',100,0,$2)`, hour10.Add(5*time.Minute), hour12.Add(5*time.Minute)); err != nil { t.Fatalf("seed windows: %v", err) } @@ -814,8 +920,8 @@ func TestIntegration_OneHopFineTuneCannotDeriveFromFineTune(t *testing.T) { } for _, s := range seeds { if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, base_model, prompt_tokens, completion_tokens, event_ts) - VALUES ($1,'a',$2,$3,1000,0,$4)`, + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, base_model, prompt_tokens, completion_tokens, event_ts) + VALUES ($1,'a','r',$2,$3,1000,0,$4)`, s.req, s.model, s.baseModel, hour.Add(5*time.Minute)); err != nil { t.Fatalf("seed %s: %v", s.req, err) } @@ -870,8 +976,8 @@ func TestIntegration_UTCBucketing_SessionTZIndependent(t *testing.T) { nil, PolicyIdentity, Dec{}, Dec{}, ) if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) - VALUES ('tz-req-0','a','b',100,0,50,$1)`, hour.Add(30*time.Minute)); err != nil { + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) + VALUES ('tz-req-0','a','r','b',100,0,50,$1)`, hour.Add(30*time.Minute)); err != nil { t.Fatalf("seed event: %v", err) } @@ -991,12 +1097,13 @@ func TestIntegration_ReconcileDeleteCanUseWindowStartIndex(t *testing.T) { // only prove the index CAN serve the predicate under seqscan-off, not that the // planner prefers it at default cost, so a large population would back nothing. exec(t, db, `INSERT INTO rated_usage - (id, auth_id, model_id, window_start, window_end, + (id, auth_id, resource_id, model_id, window_start, window_end, prompt_tokens, cached_tokens, completion_tokens, billable_prompt_tokens, cost, applied_prompt_rate, applied_cached_rate, applied_completion_rate, event_count) SELECT md5(a::text || ':' || h::text), 'auth' || a::text, + 'deploy' || a::text, 'm', '2026-01-01T00:00:00Z'::timestamptz + (h || ' hours')::interval, '2026-01-01T01:00:00Z'::timestamptz + (h || ' hours')::interval, @@ -1013,7 +1120,7 @@ func TestIntegration_ReconcileDeleteCanUseWindowStartIndex(t *testing.T) { // must still go through the index rather than seqscanning the whole table. const reconcileDelete = `EXPLAIN WITH priced AS ( - SELECT auth_id, model_id, window_start FROM rated_usage WHERE false + SELECT auth_id, resource_id, model_id, window_start FROM rated_usage WHERE false ) DELETE FROM rated_usage ru WHERE ru.window_start >= '2026-01-01T04:00:00Z' @@ -1021,6 +1128,7 @@ func TestIntegration_ReconcileDeleteCanUseWindowStartIndex(t *testing.T) { AND NOT EXISTS ( SELECT 1 FROM priced p WHERE p.auth_id = ru.auth_id + AND p.resource_id = ru.resource_id AND p.model_id = ru.model_id AND p.window_start = ru.window_start )` diff --git a/internal/rating/store_test.go b/internal/rating/store_test.go index 924aef3..a055d9b 100644 --- a/internal/rating/store_test.go +++ b/internal/rating/store_test.go @@ -110,20 +110,24 @@ func TestRateWindowSQL_Shape(t *testing.T) { "applied_prompt_rate", "applied_cached_rate", "applied_completion_rate", - // priced + attributable filter (never $0-bill unpriced/unattributable) + // priced + attributable filter (never $0-bill unpriced/unattributable). A NULL + // resource_id can't name the deployment/org (E2) → excluded + counted, never billed. "WHERE prompt_price IS NOT NULL", - "AND auth_id IS NOT NULL", - "AND model_id IS NOT NULL", + "AND auth_id IS NOT NULL", + "AND model_id IS NOT NULL", + "AND resource_id IS NOT NULL", // session-TZ-independent hour bucket "date_trunc('hour', ev_ts AT TIME ZONE 'UTC') AT TIME ZONE 'UTC'", // deterministic natural-key surrogate id (re-runs regenerate the same id), - // LENGTH-PREFIXED so a '|' in a field can never collide two keys + // LENGTH-PREFIXED so a '|' in a field can never collide two keys; resource_id is + // part of the key, in fixed order (auth_id, resource_id, model_id, window_start) "md5(length(auth_id)::text || ':' || auth_id", + "|| '|' || length(resource_id)::text || ':' || resource_id", "|| '|' || length(model_id)::text || ':' || model_id", // deterministic lock order across concurrent raters (no ABBA deadlock) - "ORDER BY auth_id, model_id, window_start", + "ORDER BY auth_id, resource_id, model_id, window_start", // idempotent upsert on the natural key - "ON CONFLICT (auth_id, model_id, window_start) DO UPDATE SET", + "ON CONFLICT (auth_id, resource_id, model_id, window_start) DO UPDATE SET", // RE-RATE RECONCILES (FIX 2): the `deleted` CTE removes any in-window rollup this // run did NOT reproduce in priced, atomically with the upsert — so a superseded // rollup cannot keep billing at its stale cost. Window-scoped + NOT EXISTS priced. @@ -132,10 +136,15 @@ func TestRateWindowSQL_Shape(t *testing.T) { "ru.window_start < $2", "NOT EXISTS (", "FROM priced p", + // the reconcile anti-join keys on the FULL grain incl. resource_id, so it matches + // the new unique constraint exactly (a deployment that fell out is reconciled) + "p.resource_id = ru.resource_id", "AS reconciled_deletions", - // the anomaly counts ride the SAME statement (one snapshot) as the upsert + // the anomaly counts ride the SAME statement (one snapshot) as the upsert; a NULL + // resource_id is counted UNATTRIBUTABLE (fail closed), never billed "AS unpriced_events", - "AS unattributable_events", + "OR resource_id IS NULL OR model_id IS NULL) AS unattributable_events", + "AND resource_id IS NOT NULL", // the E3 ft-uniqueness gate: an ft: rollup spanning >1 base_model is split out "COUNT(DISTINCT base_model) FILTER (WHERE via_derived) > 1 AS ambiguous_base", "WHERE NOT ambiguous_base", diff --git a/migrations/0002_rating.sql b/migrations/0002_rating.sql index 4a3d80b..7ae4eeb 100644 --- a/migrations/0002_rating.sql +++ b/migrations/0002_rating.sql @@ -10,9 +10,12 @@ -- each rated_usage row. See config/prices.example.yaml and internal/rating. -- -- ONE table here: --- rated_usage — the rollup. Per-(auth_id, model_id, hour) cost, idempotently --- upserted, carrying the applied per-token rates so the row is --- self-auditing and immutable ("we never reprice served traffic"). +-- rated_usage — the rollup. Per-(auth_id, resource_id, model_id, hour) cost, +-- idempotently upserted, carrying the applied per-token rates so the +-- row is self-auditing and immutable ("we never reprice served +-- traffic"). resource_id (the deployment id) is part of the grain for +-- E2 customer attribution: billing resolves the org via +-- resource_id→org_id. -- -- MONEY UNIT (read before touching any number here): -- All money is stored as NUMERIC(20,9) — exact base-10 decimal, 9 fractional @@ -31,16 +34,29 @@ -- migrations/atlas/c2f1a3b4d5e6_add_rating.py and migrations/README.md. Keep the -- two in sync. --- rated_usage: the per-(auth_id, model_id, hour) cost rollup. +-- rated_usage: the per-(auth_id, resource_id, model_id, hour) cost rollup. -- -- Grain is HOURLY (window_start truncated to the hour), matching Atlas's --- hourly_usage_record. The natural key (auth_id, model_id, window_start) is UNIQUE --- so re-rating a window UPSERTS the same rows instead of duplicating them — re-runs --- are idempotent and never double-count. +-- hourly_usage_record. The natural key (auth_id, resource_id, model_id, window_start) +-- is UNIQUE so re-rating a window UPSERTS the same rows instead of duplicating them — +-- re-runs are idempotent and never double-count. +-- +-- resource_id IS PART OF THE GRAIN, NOT REDUNDANT WITH model_id (E2 customer +-- attribution): billing identifies the customer org via resource_id→org_id. model_id +-- stays because price resolution is per-MODEL — one deployment can serve multiple +-- models/adapters at different rates, and collapsing them would sum traffic priced +-- differently. Two deployments of the SAME model by the SAME auth in one hour are +-- TWO rows (correct — they may bill to different orgs). resource_id is NON-NULL here +-- though NULLABLE on billing_event: a row that can't name its deployment/org cannot +-- be billed, so the rater EXCLUDES a NULL-resource_id event from this rollup and +-- COUNTS it as unattributable (fail closed) — see internal/rating/store.go. CREATE TABLE rated_usage ( id VARCHAR(32) NOT NULL, -- surrogate; natural key is the unique constraint below auth_id VARCHAR(64) NOT NULL, + -- resource_id: the deployment id (E2 customer attribution). NON-NULL — the rater + -- never writes a row it can't attribute to a deployment/org. + resource_id VARCHAR(64) NOT NULL, model_id VARCHAR(255) NOT NULL, -- [window_start, window_end) is the hour this rollup covers. @@ -79,15 +95,22 @@ CREATE TABLE rated_usage ( rated_at TIMESTAMPTZ NOT NULL DEFAULT now(), CONSTRAINT pk_rated_usage PRIMARY KEY (id), - -- Idempotency key: one rollup row per (auth_id, model_id, hour). - CONSTRAINT rated_usage_auth_model_window_uq - UNIQUE (auth_id, model_id, window_start) + -- Idempotency key: one rollup row per (auth_id, resource_id, model_id, hour). + CONSTRAINT rated_usage_auth_resource_model_window_uq + UNIQUE (auth_id, resource_id, model_id, window_start) ); -- Per-API-key billing queries scan by auth_id over a time window. CREATE INDEX rated_usage_auth_id_window_start_ix ON rated_usage (auth_id, window_start); +-- E2 customer attribution reads rated_usage by DEPLOYMENT over a time window (resolve +-- the org via resource_id→org_id, then sum that deployment's cost). resource_id leads +-- so a per-deployment time-range query is a tight index slice rather than a scan over +-- an auth-leading index where resource_id is only a trailing column. +CREATE INDEX rated_usage_resource_id_window_start_ix + ON rated_usage (resource_id, window_start); + -- The reconcile DELETE (re-rate convergence, the `deleted` CTE in -- internal/rating/store.go) filters rated_usage on window_start ALONE -- (window_start >= $1 AND window_start < $2), then anti-joins priced. Every other diff --git a/migrations/README.md b/migrations/README.md index 2de3bb0..66126cc 100644 --- a/migrations/README.md +++ b/migrations/README.md @@ -5,8 +5,12 @@ This directory holds the schema for phoebe's billing tables, which live in the - `billing_event` — the system-of-record table that phoebe's Postgres drainer (`cmd/drainer`) writes raw, pre-rating metering records into. -- `rated_usage` — the **rating (E1)** (revenue) rollup: per-(auth_id, model_id, - hour) cost, carrying the applied per-token rates frozen onto each row. **Money is +- `rated_usage` — the **rating (E1)** (revenue) rollup: per-(auth_id, resource_id, + model_id, hour) cost, carrying the applied per-token rates frozen onto each row. + `resource_id` (the deployment id) is part of the grain for **E2 customer + attribution** (billing resolves the org via `resource_id`→`org_id`); a + NULL-`resource_id` event fails closed (counted unattributable, never billed). + **Money is stored as `NUMERIC(20,9)` — exact decimal, never float and never an integer micro/nano scalar — and ALL money math happens in SQL, not Go.** diff --git a/migrations/atlas/c2f1a3b4d5e6_add_rating.py b/migrations/atlas/c2f1a3b4d5e6_add_rating.py index f06a8ed..7db004c 100644 --- a/migrations/atlas/c2f1a3b4d5e6_add_rating.py +++ b/migrations/atlas/c2f1a3b4d5e6_add_rating.py @@ -21,10 +21,14 @@ micro/nano scalar. The rater projects the YAML prices into a transient TEMP table and computes AND sums per-event cost in one SQL statement; Go never holds a money total. The fine-tune premium is applied in exact decimal at projection. - * rated_usage is the per-(auth_id, model_id, hour) rollup, idempotently upserted on - the natural key. It carries the APPLIED per-token rates (applied_prompt_rate / - applied_cached_rate / applied_completion_rate) so the row is self-auditing and - immutable: "we never reprice traffic you've already served" holds by construction. + * rated_usage is the per-(auth_id, resource_id, model_id, hour) rollup, idempotently + upserted on the natural key. It carries the APPLIED per-token rates + (applied_prompt_rate / applied_cached_rate / applied_completion_rate) so the row is + self-auditing and immutable: "we never reprice traffic you've already served" holds + by construction. resource_id (the deployment id) is part of the grain for E2 + customer attribution — billing resolves the org via resource_id→org_id. It is + NON-NULL here though NULLABLE on billing_event: the rater fails closed, excluding a + NULL-resource_id event from the rollup and counting it as unattributable. CLEAN REWRITE (no prod data): an earlier draft of this revision created model_price + derivation_policy. E1 moved prices to a YAML file, so those tables are gone and @@ -49,11 +53,16 @@ def upgrade(): - # --- rated_usage: the per-(auth_id, model_id, hour) cost rollup --- + # --- rated_usage: the per-(auth_id, resource_id, model_id, hour) cost rollup --- + # resource_id (the deployment id) is part of the grain for E2 customer attribution + # (billing resolves the org via resource_id→org_id). NON-NULL here though NULLABLE + # on billing_event: the rater fails closed — a NULL-resource_id event is excluded + # from the rollup and counted as unattributable, never billed to a NULL org. op.create_table( "rated_usage", sa.Column("id", sa.Unicode(length=32), nullable=False), sa.Column("auth_id", sa.Unicode(length=64), nullable=False), + sa.Column("resource_id", sa.Unicode(length=64), nullable=False), sa.Column("model_id", sa.Unicode(length=255), nullable=False), sa.Column("window_start", sa.DateTime(timezone=True), nullable=False), sa.Column("window_end", sa.DateTime(timezone=True), nullable=False), @@ -81,12 +90,13 @@ def upgrade(): server_default=sa.func.now(), ), sa.PrimaryKeyConstraint("id", name=op.f("pk_rated_usage")), - # Idempotency key: one rollup row per (auth_id, model_id, hour). + # Idempotency key: one rollup row per (auth_id, resource_id, model_id, hour). sa.UniqueConstraint( "auth_id", + "resource_id", "model_id", "window_start", - name=op.f("rated_usage_auth_model_window_uq"), + name=op.f("rated_usage_auth_resource_model_window_uq"), ), ) op.create_index( @@ -96,6 +106,18 @@ def upgrade(): unique=False, ) + # E2 customer attribution reads rated_usage by DEPLOYMENT over a time window + # (resolve the org via resource_id→org_id, then sum that deployment's cost). A + # resource_id-leading index makes a per-deployment time-range query a tight slice + # rather than a scan over the auth-leading index where resource_id only trails. + # Mirrors migrations/0002_rating.sql. + op.create_index( + "rated_usage_resource_id_window_start_ix", + "rated_usage", + ["resource_id", "window_start"], + unique=False, + ) + # The reconcile DELETE (re-rate convergence, the `deleted` CTE in # internal/rating/store.go) filters rated_usage on window_start ALONE # (window_start >= $1 AND window_start < $2), then anti-joins priced. Every other @@ -156,6 +178,11 @@ def downgrade(): op.drop_index( "rated_usage_window_start_ix", table_name="rated_usage", if_exists=True ) + op.drop_index( + "rated_usage_resource_id_window_start_ix", + table_name="rated_usage", + if_exists=True, + ) op.drop_index( "rated_usage_auth_id_window_start_ix", table_name="rated_usage", if_exists=True )