diff --git a/cmd/rater/main.go b/cmd/rater/main.go index c770edc..6b17ad5 100644 --- a/cmd/rater/main.go +++ b/cmd/rater/main.go @@ -3,7 +3,7 @@ // premium policy, and per-GPU floor rates — reads a window of raw metering rows from // billing_event, projects the prices into a transient TEMP table, and ENTIRELY IN // SQL computes per-event cost as exact NUMERIC, sums it, and upserts -// per-(auth_id, model_id, hour) cost rollups into rated_usage (with the applied +// per-(auth_id, resource_id, model_id, hour) cost rollups into rated_usage (with the applied // per-token rates frozen onto each row). No money math happens in Go — the rater // binary is orchestration only (the fine-tune premium is applied in exact decimal // when the prices are projected, then handed to SQL as NUMERIC). @@ -66,8 +66,8 @@ // last hour: events can be DRAINED LATE (a Valkey outage recovered from the WAL) // with an event_ts in an hour that was already rated, and a last-hour-only cron // would never revisit that hour — silent revenue loss. Re-rating closed hours is -// safe BY CONSTRUCTION: the upsert REPLACES each (auth_id, model_id, hour) bucket -// with a freshly recomputed total, so the late event is folded in and nothing +// safe BY CONSTRUCTION: the upsert REPLACES each (auth_id, resource_id, model_id, hour) +// bucket with a freshly recomputed total, so the late event is folded in and nothing // doubles. RESIDUAL RISK, stated honestly: an event arriving MORE than N hours // late still slips past the default window (the DESIGN.md reconciliation backstop // is the eventual answer; until then, widen rateTrailingHours or re-rate the hour @@ -228,7 +228,7 @@ func run() int { // re-rates the last N closed hours so an event DRAINED LATE into an already-rated // hour (Valkey outage → WAL recovery) is picked up by a later run instead of being // lost forever. Re-rating a closed hour is safe by construction: the upsert -// REPLACES each (auth_id, model_id, hour) bucket with a recomputed total, so +// REPLACES each (auth_id, resource_id, model_id, hour) bucket with a recomputed total, so // re-runs reconcile and never double-count. An event arriving more than N hours // late still slips (residual risk; see the package doc — reconciliation is the // backstop). Either flag may be given explicitly (RFC3339) and WINS over the diff --git a/internal/e2e/e2e_test.go b/internal/e2e/e2e_test.go index 6c2a3c2..447c15f 100644 --- a/internal/e2e/e2e_test.go +++ b/internal/e2e/e2e_test.go @@ -469,7 +469,7 @@ func TestE2E_StreamedRequestBecomesMoney(t *testing.T) { var ( nRollups int - ruAuthID, ruModelID, cost string + ruAuthID, ruResourceID, ruModelID, cost string ruPrompt, ruCached, ruCompletion, ruBillable int64 eventCount int64 // BIGINT column ) @@ -480,14 +480,20 @@ func TestE2E_StreamedRequestBecomesMoney(t *testing.T) { t.Fatalf("rated_usage rows = %d, want exactly 1", nRollups) } if err := h.db.QueryRow( - `SELECT auth_id, model_id, prompt_tokens, cached_tokens, completion_tokens, billable_prompt_tokens, cost::text, event_count + `SELECT auth_id, resource_id, model_id, prompt_tokens, cached_tokens, completion_tokens, billable_prompt_tokens, cost::text, event_count FROM rated_usage`). - Scan(&ruAuthID, &ruModelID, &ruPrompt, &ruCached, &ruCompletion, &ruBillable, &cost, &eventCount); err != nil { + Scan(&ruAuthID, &ruResourceID, &ruModelID, &ruPrompt, &ruCached, &ruCompletion, &ruBillable, &cost, &eventCount); err != nil { t.Fatalf("read rated_usage: %v", err) } if ruAuthID != testAuthID { t.Errorf("rated_usage.auth_id = %q, want %q (the X-Saturn-Auth-Id header value)", ruAuthID, testAuthID) } + // E2 customer attribution: the rollup carries the deployment id the request routed + // to (X-Saturn-Resource-Id), end-to-end from the proxy through billing_event into + // the rated_usage grain — this is the key billing resolves the org from. + if ruResourceID != testResourceID { + t.Errorf("rated_usage.resource_id = %q, want %q (the X-Saturn-Resource-Id header value, for E2 org attribution)", ruResourceID, testResourceID) + } // THE deployment-id-bug guard, at the far end of the pipe: the money is // keyed on the engine name the upstream reported, not the id we routed on. if ruModelID != testModelName { diff --git a/internal/rating/doc.go b/internal/rating/doc.go index c4dc847..bc427d8 100644 --- a/internal/rating/doc.go +++ b/internal/rating/doc.go @@ -1,5 +1,5 @@ // Package rating is phoebe's REVENUE path. It turns the raw token counts in -// billing_event into money: per (auth_id, model_id, hour) cost rollups in +// billing_event into money: per (auth_id, resource_id, model_id, hour) cost rollups in // rated_usage, priced from a YAML PRICE FILE (E1) — not a DB price table. // // THE PRICE FILE IS THE CONTRACT (E1). An operator authors a versioned YAML file diff --git a/internal/rating/oracle_test.go b/internal/rating/oracle_test.go index 36cf368..7322006 100644 --- a/internal/rating/oracle_test.go +++ b/internal/rating/oracle_test.go @@ -30,8 +30,13 @@ import ( // decoupled from metering.Event so the pure money math has no dependency on the // capture/emit side and can be tested in isolation. type RatedEvent struct { - AuthID string - ModelID string + AuthID string + // ResourceID is the deployment id (E2 customer attribution — billing resolves the + // org via resource_id→org_id). Part of the rollup grain. Empty → unattributable + // (the row can't name its deployment/org), mirroring the SQL's resource_id-IS-NULL + // handling: counted, never billed. + ResourceID string + ModelID string // BaseModel is the HF base id a fine-tune derives from (E3), carried on the event // from billing_event.base_model. Empty for a base model. The oracle prices an ft: // ModelID via base x premium keyed on BaseModel — mirroring the SQL. diff --git a/internal/rating/rater.go b/internal/rating/rater.go index 643799e..a40101a 100644 --- a/internal/rating/rater.go +++ b/internal/rating/rater.go @@ -43,9 +43,9 @@ type Result struct { // casts these as ::bigint to avoid a silent 32-bit overflow (see store.go). EventsRated int64 UnpricedEvents int64 // events whose model had NO resolvable price (NOT $0-billed) - UnattributableEvents int64 // in-window rows with NULL auth_id/model_id (upstream leak) + UnattributableEvents int64 // in-window rows with NULL auth_id/resource_id/model_id (upstream leak) AmbiguousBaseEvents int64 // events under an ft: rollup spanning >1 base_model (E3 violation) - RollupsWritten int64 // distinct (auth_id, model_id, hour) rows upserted + RollupsWritten int64 // distinct (auth_id, resource_id, model_id, hour) rows upserted ReconciledDeletions int64 // stale in-window rollups DELETED because this re-run no longer produces them TotalCost string // sum of all rollup costs, NUMERIC as text } @@ -55,7 +55,9 @@ type Result struct { func (r Result) HasUnpriced() bool { return r.UnpricedEvents > 0 } // HasUnattributable reports whether any in-window row was skipped for a NULL -// auth_id/model_id — like HasUnpriced, a loud, exit-nonzero outcome. +// auth_id/resource_id/model_id — like HasUnpriced, a loud, exit-nonzero outcome. A +// NULL resource_id means the row can't name its deployment/org (E2), so it can't be +// billed and is counted here rather than attributed to a NULL org. func (r Result) HasUnattributable() bool { return r.UnattributableEvents > 0 } // HasAmbiguousBase reports whether any ft: rollup spanned more than one base_model in a @@ -79,7 +81,7 @@ func (r Result) HasAnomaly() bool { // // FAIL-LOUD ON MISSING PRICE / UNATTRIBUTABLE (the fail-closed rule): an event // whose model has no resolvable price at its time, and a row with a NULL -// auth_id/model_id, are NOT summed into any rollup — the SQL excludes them. They +// auth_id/resource_id/model_id, are NOT summed into any rollup — the SQL excludes them. They // are COUNTED by the SAME statement that writes the rollups (one snapshot, so a // row the drainer commits mid-run can never be excluded-but-uncounted), logged // loudly (ERROR), and drive cmd/rater's exit-nonzero path. They never become $0 @@ -131,7 +133,7 @@ func (r *Rater) Run(ctx context.Context, windowStart, windowEnd time.Time, windo windowStart.Format(time.RFC3339), windowEnd.Format(time.RFC3339), res.AmbiguousBaseEvents) } if res.HasUnattributable() { - r.log.Error.Printf("rating: window [%s,%s) has %d UNATTRIBUTABLE billing_event rows (NULL auth_id/model_id) — these cannot be rated; the interceptor's billing gate should reject them before metering, so a nonzero count means revenue is leaking upstream", + r.log.Error.Printf("rating: window [%s,%s) has %d UNATTRIBUTABLE billing_event rows (NULL auth_id/resource_id/model_id — a NULL resource_id can't name the deployment/org for E2 billing) — these cannot be rated; the interceptor's billing gate should reject them before metering, so a nonzero count means revenue is leaking upstream", windowStart.Format(time.RFC3339), windowEnd.Format(time.RFC3339), res.UnattributableEvents) } if res.HasUnpriced() { @@ -162,7 +164,7 @@ func (r *Rater) Run(ctx context.Context, windowStart, windowEnd time.Time, windo } if res.HasAnomaly() { - r.log.Error.Printf("rating: window [%s,%s) rated %d events into %d rollups, total=%s USD; %d UNPRICED events dropped (backfill prices and re-rate), %d UNATTRIBUTABLE rows skipped (NULL auth_id/model_id — upstream billing-gate leak), %d AMBIGUOUS-BASE events dropped (ft: id spanning multiple base_models — fix base_model propagation and re-rate)", + r.log.Error.Printf("rating: window [%s,%s) rated %d events into %d rollups, total=%s USD; %d UNPRICED events dropped (backfill prices and re-rate), %d UNATTRIBUTABLE rows skipped (NULL auth_id/resource_id/model_id — upstream billing-gate leak), %d AMBIGUOUS-BASE events dropped (ft: id spanning multiple base_models — fix base_model propagation and re-rate)", windowStart.Format(time.RFC3339), windowEnd.Format(time.RFC3339), res.EventsRated, res.RollupsWritten, res.TotalCost, res.UnpricedEvents, res.UnattributableEvents, res.AmbiguousBaseEvents) } else { diff --git a/internal/rating/rater_test.go b/internal/rating/rater_test.go index c828d45..be75101 100644 --- a/internal/rating/rater_test.go +++ b/internal/rating/rater_test.go @@ -30,6 +30,7 @@ type oracleStore struct { type rollupKey struct { authID string + resourceID string modelID string windowStart time.Time } @@ -64,7 +65,7 @@ func (s *oracleStore) resolveWindow(start, end time.Time) (map[rollupKey]oracleR if e.At.Before(start) || !e.At.Before(end) { continue } - if e.AuthID == "" || e.ModelID == "" { + if e.AuthID == "" || e.ResourceID == "" || e.ModelID == "" { an.UnattributableEvents++ continue } @@ -80,7 +81,7 @@ func (s *oracleStore) resolveWindow(start, end time.Time) (map[rollupKey]oracleR rate := resolved.Quantized() cost := rateExact(e, rate) hour := e.At.UTC().Truncate(time.Hour) - k := rollupKey{authID: e.AuthID, modelID: e.ModelID, windowStart: hour} + k := rollupKey{authID: e.AuthID, resourceID: e.ResourceID, modelID: e.ModelID, windowStart: hour} ru := out[k] ru.prompt += e.PromptTokens ru.cached += e.CachedTokens @@ -166,8 +167,8 @@ func testLogger() *logging.Logger { return logging.New(logging.ERROR) } func TestRater_MultiEventAggregation(t *testing.T) { at := mustTime("2026-06-08T10:15:00Z") events := []RatedEvent{ - {AuthID: "a", ModelID: "m", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, - {AuthID: "a", ModelID: "m", PromptTokens: 10, CachedTokens: 0, CompletionTokens: 5, At: at.Add(20 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 10, CachedTokens: 0, CompletionTokens: 5, At: at.Add(20 * time.Minute)}, } store := newOracleStore(bookM(), events) r := New(store, bookM(), testLogger()) @@ -185,7 +186,7 @@ func TestRater_MultiEventAggregation(t *testing.T) { if res.TotalCost != "0.000799000" { t.Fatalf("total = %s, want 0.000799000", res.TotalCost) } - k := rollupKey{authID: "a", modelID: "m", windowStart: mustTime("2026-06-08T10:00:00Z")} + k := rollupKey{authID: "a", resourceID: "r", modelID: "m", windowStart: mustTime("2026-06-08T10:00:00Z")} ru := store.table[k] if ru.prompt != 110 || ru.cached != 30 || ru.completion != 55 || ru.billable != 80 { t.Fatalf("token sums = p%d c%d comp%d bill%d, want 110/30/55/80", ru.prompt, ru.cached, ru.completion, ru.billable) @@ -200,7 +201,7 @@ func TestRater_MultiEventAggregation(t *testing.T) { func TestRater_IdempotentRerunNoDoubling(t *testing.T) { at := mustTime("2026-06-08T10:15:00Z") events := []RatedEvent{ - {AuthID: "a", ModelID: "m", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, } store := newOracleStore(bookM(), events) r := New(store, bookM(), testLogger()) @@ -244,7 +245,7 @@ func TestRater_ReRateDeletesSupersededRollup(t *testing.T) { // Run A: a single-base ft: rollup — CLEAN, bills. store := newOracleStore(book, []RatedEvent{ - {AuthID: "a", ModelID: "ft:dupe", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "ft:dupe", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, }) r := New(store, book, testLogger()) resA, err := r.Run(context.Background(), ws, we, false) @@ -254,7 +255,7 @@ func TestRater_ReRateDeletesSupersededRollup(t *testing.T) { if resA.RollupsWritten != 1 || resA.ReconciledDeletions != 0 { t.Fatalf("run A: rollups=%d deletions=%d, want 1/0", resA.RollupsWritten, resA.ReconciledDeletions) } - dupeKey := rollupKey{authID: "a", modelID: "ft:dupe", windowStart: ws} + dupeKey := rollupKey{authID: "a", resourceID: "r", modelID: "ft:dupe", windowStart: ws} if _, ok := store.table[dupeKey]; !ok { t.Fatal("run A: clean ft:dupe rollup must exist before re-rate") } @@ -262,7 +263,7 @@ func TestRater_ReRateDeletesSupersededRollup(t *testing.T) { // Mutate data: a SECOND, distinct base_model arrives for the SAME ft: id in the same // window → ambiguous. Run B excludes it from priced. store.events = append(store.events, - RatedEvent{AuthID: "a", ModelID: "ft:dupe", BaseModel: "expensive/base", PromptTokens: 1000, At: at.Add(5 * time.Minute)}) + RatedEvent{AuthID: "a", ResourceID: "r", ModelID: "ft:dupe", BaseModel: "expensive/base", PromptTokens: 1000, At: at.Add(5 * time.Minute)}) resB, err := r.Run(context.Background(), ws, we, false) if err != nil { t.Fatal(err) @@ -299,8 +300,8 @@ func TestRater_ReRateSupersedesOneKeepsAnotherSameWindow(t *testing.T) { ws, we := mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z") // Run A: TWO clean single-base ft: rollups (ft:keep and ft:gone). store := newOracleStore(book, []RatedEvent{ - {AuthID: "a", ModelID: "ft:keep", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, - {AuthID: "a", ModelID: "ft:gone", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "ft:keep", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "ft:gone", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, }) r := New(store, book, testLogger()) if _, err := r.Run(context.Background(), ws, we, false); err != nil { @@ -311,7 +312,7 @@ func TestRater_ReRateSupersedesOneKeepsAnotherSameWindow(t *testing.T) { } // Run B: ft:gone gains a second base (ambiguous → superseded); ft:keep unchanged. store.events = append(store.events, - RatedEvent{AuthID: "a", ModelID: "ft:gone", BaseModel: "expensive/base", PromptTokens: 1000, At: at.Add(5 * time.Minute)}) + RatedEvent{AuthID: "a", ResourceID: "r", ModelID: "ft:gone", BaseModel: "expensive/base", PromptTokens: 1000, At: at.Add(5 * time.Minute)}) resB, err := r.Run(context.Background(), ws, we, false) if err != nil { t.Fatal(err) @@ -319,10 +320,10 @@ func TestRater_ReRateSupersedesOneKeepsAnotherSameWindow(t *testing.T) { if resB.ReconciledDeletions != 1 { t.Fatalf("run B: deletions = %d, want 1 (only ft:gone superseded)", resB.ReconciledDeletions) } - if _, gone := store.table[rollupKey{authID: "a", modelID: "ft:gone", windowStart: ws}]; gone { + if _, gone := store.table[rollupKey{authID: "a", resourceID: "r", modelID: "ft:gone", windowStart: ws}]; gone { t.Fatal("ft:gone rollup survived — it became ambiguous and must be deleted") } - if _, keep := store.table[rollupKey{authID: "a", modelID: "ft:keep", windowStart: ws}]; !keep { + if _, keep := store.table[rollupKey{authID: "a", resourceID: "r", modelID: "ft:keep", windowStart: ws}]; !keep { t.Fatal("ft:keep rollup was deleted — a surviving co-window rollup must be UPDATED, not clobbered") } if len(store.table) != 1 { @@ -338,7 +339,7 @@ func TestRater_ReRateDeletesVanishedRollup(t *testing.T) { at := mustTime("2026-06-08T10:15:00Z") ws, we := mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z") store := newOracleStore(bookM(), []RatedEvent{ - {AuthID: "a", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, }) r := New(store, bookM(), testLogger()) if _, err := r.Run(context.Background(), ws, we, false); err != nil { @@ -367,8 +368,8 @@ func TestRater_ReRateIdenticalDataIsNoOp(t *testing.T) { at := mustTime("2026-06-08T10:15:00Z") ws, we := mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z") store := newOracleStore(bookM(), []RatedEvent{ - {AuthID: "a", ModelID: "m", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, - {AuthID: "b", ModelID: "m", PromptTokens: 200, At: at.Add(10 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, + {AuthID: "b", ResourceID: "r", ModelID: "m", PromptTokens: 200, At: at.Add(10 * time.Minute)}, }) r := New(store, bookM(), testLogger()) @@ -394,7 +395,7 @@ func TestRater_ReRateIdenticalDataIsNoOp(t *testing.T) { // [ws,we): the reconcile's window predicate is half-open and hour-scoped. otherHour := mustTime("2026-06-08T12:00:00Z") store.events = append(store.events, - RatedEvent{AuthID: "a", ModelID: "m", PromptTokens: 5, At: otherHour.Add(5 * time.Minute)}) + RatedEvent{AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 5, At: otherHour.Add(5 * time.Minute)}) if _, err := r.Run(context.Background(), otherHour, otherHour.Add(time.Hour), false); err != nil { t.Fatal(err) } @@ -406,7 +407,7 @@ func TestRater_ReRateIdenticalDataIsNoOp(t *testing.T) { if resFirst.ReconciledDeletions != 0 { t.Fatalf("re-rate of [10:00,11:00) deleted %d rows, want 0 (must not touch the 12:00 rollup)", resFirst.ReconciledDeletions) } - otherKey := rollupKey{authID: "a", modelID: "m", windowStart: otherHour} + otherKey := rollupKey{authID: "a", resourceID: "r", modelID: "m", windowStart: otherHour} if _, ok := store.table[otherKey]; !ok { t.Fatal("a re-rate of the first window deleted an out-of-window (12:00) rollup — the window predicate leaked") } @@ -432,7 +433,7 @@ func supersededReconcileStore() (*oracleStore, time.Time, time.Time) { at := mustTime("2026-06-08T10:15:00Z") ws, we := mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z") store := newOracleStore(bookM(), []RatedEvent{ - {AuthID: "a", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, }) return store, ws, we } @@ -538,7 +539,7 @@ func TestRater_LateArrivalRatedByTrailingWindow(t *testing.T) { } store.events = append(store.events, RatedEvent{ - AuthID: "a", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: hourH.Add(15 * time.Minute), + AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: hourH.Add(15 * time.Minute), }) res2, err := r.Run(context.Background(), hourH.Add(-23*time.Hour), hourH.Add(2*time.Hour), false) @@ -548,7 +549,7 @@ func TestRater_LateArrivalRatedByTrailingWindow(t *testing.T) { if res2.EventsRated != 1 || res2.RollupsWritten != 1 { t.Fatalf("run2 rated=%d rollups=%d, want 1/1 (late event caught by trailing window)", res2.EventsRated, res2.RollupsWritten) } - k := rollupKey{authID: "a", modelID: "m", windowStart: hourH} + k := rollupKey{authID: "a", resourceID: "r", modelID: "m", windowStart: hourH} if _, ok := store.table[k]; !ok { t.Fatal("no rollup for hour H — the late-drained event was lost") } @@ -560,8 +561,8 @@ func TestRater_LateArrivalRatedByTrailingWindow(t *testing.T) { func TestRater_MissingPriceFailsLoudNotZero(t *testing.T) { at := mustTime("2026-06-08T10:15:00Z") events := []RatedEvent{ - {AuthID: "a", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // priced - {AuthID: "a", ModelID: "unpriced", PromptTokens: 100, CompletionTokens: 50, At: at}, // NO price + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // priced + {AuthID: "a", ResourceID: "r", ModelID: "unpriced", PromptTokens: 100, CompletionTokens: 50, At: at}, // NO price } store := newOracleStore(bookM(), events) r := New(store, bookM(), testLogger()) @@ -585,14 +586,17 @@ func TestRater_MissingPriceFailsLoudNotZero(t *testing.T) { } } -// TestRater_UnattributableCountedNotSilent: rows with NULL auth_id and/or model_id -// must NOT be rated, MUST be counted, and MUST trigger the loud anomaly / exit-2 path. +// TestRater_UnattributableCountedNotSilent: rows with NULL auth_id, NULL resource_id, +// and/or NULL model_id must NOT be rated, MUST be counted, and MUST trigger the loud +// anomaly / exit-2 path. resource_id joins the key columns (E2): a row that can't name +// its deployment/org is unattributable exactly like a NULL auth_id/model_id. func TestRater_UnattributableCountedNotSilent(t *testing.T) { at := mustTime("2026-06-08T10:15:00Z") events := []RatedEvent{ - {AuthID: "a", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // ok - {AuthID: "", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // NULL auth_id - {AuthID: "a", ModelID: "", PromptTokens: 100, CompletionTokens: 50, At: at}, // NULL model_id + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // ok + {AuthID: "", ResourceID: "r", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // NULL auth_id + {AuthID: "a", ResourceID: "", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // NULL resource_id + {AuthID: "a", ResourceID: "r", ModelID: "", PromptTokens: 100, CompletionTokens: 50, At: at}, // NULL model_id } store := newOracleStore(bookM(), events) r := New(store, bookM(), testLogger()) @@ -600,8 +604,8 @@ func TestRater_UnattributableCountedNotSilent(t *testing.T) { if err != nil { t.Fatal(err) } - if res.UnattributableEvents != 2 { - t.Fatalf("unattributable = %d, want 2", res.UnattributableEvents) + if res.UnattributableEvents != 3 { + t.Fatalf("unattributable = %d, want 3 (NULL auth_id, NULL resource_id, NULL model_id)", res.UnattributableEvents) } if res.EventsRated != 1 || res.RollupsWritten != 1 || len(store.table) != 1 { t.Fatalf("rated=%d rollups=%d table=%d, want 1/1/1", res.EventsRated, res.RollupsWritten, len(store.table)) @@ -611,6 +615,82 @@ func TestRater_UnattributableCountedNotSilent(t *testing.T) { } } +// TestRater_DistinctDeploymentsBillSeparately (E2 grain — the negative invariant): +// TWO deployments (distinct resource_id) of the SAME model by the SAME auth in the +// SAME hour produce TWO distinct rated_usage rows, NOT one summed row. resource_id is +// part of the grain precisely because the two deployments may bill to different orgs +// (resource_id→org_id), so collapsing them would mis-attribute revenue. The two rows +// share (auth_id, model_id, window_start) and differ ONLY in resource_id. +func TestRater_DistinctDeploymentsBillSeparately(t *testing.T) { + at := mustTime("2026-06-08T10:15:00Z") + ws := mustTime("2026-06-08T10:00:00Z") + events := []RatedEvent{ + {AuthID: "a", ResourceID: "deploy-1", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, + {AuthID: "a", ResourceID: "deploy-2", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at.Add(5 * time.Minute)}, + } + store := newOracleStore(bookM(), events) + r := New(store, bookM(), testLogger()) + res, err := r.Run(context.Background(), ws, mustTime("2026-06-08T11:00:00Z"), false) + if err != nil { + t.Fatal(err) + } + // TWO rollups, one per deployment — NOT one summed row. + if res.EventsRated != 2 || res.RollupsWritten != 2 || len(store.table) != 2 { + t.Fatalf("rated=%d rollups=%d table=%d, want 2/2/2 (distinct resource_id must NOT collapse into one row)", + res.EventsRated, res.RollupsWritten, len(store.table)) + } + // Each deployment has its OWN row under the same (auth, model, hour). + for _, rid := range []string{"deploy-1", "deploy-2"} { + k := rollupKey{authID: "a", resourceID: rid, modelID: "m", windowStart: ws} + ru, ok := store.table[k] + if !ok { + t.Fatalf("no rollup for deployment %q — distinct deployments must each bill", rid) + } + if ru.eventCount != 1 { + t.Fatalf("deployment %q event_count = %d, want 1 (one event per deployment)", rid, ru.eventCount) + } + } +} + +// TestRater_NullResourceIdIsUnattributable (fail-closed attribution): an event with a +// NULL resource_id CANNOT name its deployment/org (E2 resolves the org via +// resource_id→org_id), so it must be counted UNATTRIBUTABLE and EXCLUDED from billing — +// never $0-billed, never billed to a NULL org. It pins the partition invariant with +// resource_id in the mix: rated + unpriced + unattributable + ambiguous == total. +func TestRater_NullResourceIdIsUnattributable(t *testing.T) { + at := mustTime("2026-06-08T10:15:00Z") + events := []RatedEvent{ + {AuthID: "a", ResourceID: "r", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // attributable + {AuthID: "a", ResourceID: "", ModelID: "m", PromptTokens: 100, CompletionTokens: 50, At: at}, // NULL resource_id → unattributable + } + store := newOracleStore(bookM(), events) + r := New(store, bookM(), testLogger()) + res, err := r.Run(context.Background(), mustTime("2026-06-08T10:00:00Z"), mustTime("2026-06-08T11:00:00Z"), false) + if err != nil { + t.Fatal(err) + } + if res.UnattributableEvents != 1 { + t.Fatalf("unattributable = %d, want 1 (the NULL-resource_id event must be counted, never billed)", res.UnattributableEvents) + } + if res.EventsRated != 1 || res.RollupsWritten != 1 || len(store.table) != 1 { + t.Fatalf("rated=%d rollups=%d table=%d, want 1/1/1 (no row for the unattributable event)", res.EventsRated, res.RollupsWritten, len(store.table)) + } + // No rollup carries an empty resource_id — the fail-closed row is never written. + for k := range store.table { + if k.resourceID == "" { + t.Fatal("a rollup exists with an empty resource_id — a row that can't name its deployment/org must NEVER be billed") + } + } + if !res.HasUnattributable() || !res.HasAnomaly() { + t.Fatal("HasUnattributable/HasAnomaly = false, want true (NULL resource_id must drive exit-nonzero)") + } + // PARTITION with resource_id in the mix. + if got := res.EventsRated + res.UnpricedEvents + res.UnattributableEvents + res.AmbiguousBaseEvents; got != int64(len(events)) { + t.Fatalf("rated(%d)+unpriced(%d)+unattr(%d)+ambiguous(%d) = %d, want %d", + res.EventsRated, res.UnpricedEvents, res.UnattributableEvents, res.AmbiguousBaseEvents, got, len(events)) + } +} + // TestRater_DerivedFineTuneRatedViaPolicy: end-to-end, a fine-tune (derived_from a // base) is rated at base × premium — exercising derivation in the run. func TestRater_DerivedFineTuneRatedViaPolicy(t *testing.T) { @@ -620,7 +700,7 @@ func TestRater_DerivedFineTuneRatedViaPolicy(t *testing.T) { PolicyMultiplier, MustDec("1.5"), Dec{}, ) events := []RatedEvent{ - {AuthID: "a", ModelID: "ft", PromptTokens: 1000, At: mustTime("2026-06-08T10:15:00Z")}, + {AuthID: "a", ResourceID: "r", ModelID: "ft", PromptTokens: 1000, At: mustTime("2026-06-08T10:15:00Z")}, } store := newOracleStore(book, events) r := New(store, book, testLogger()) @@ -646,7 +726,7 @@ func TestRater_FineTunePricesViaBaseModelOnEvent(t *testing.T) { PolicyMultiplier, MustDec("1.5"), Dec{}, ) events := []RatedEvent{{ - AuthID: "a", + AuthID: "a", ResourceID: "r", ModelID: "ft:9f8e7d6c5b4a", // a checkpoint id the file does not list BaseModel: "meta-llama/Llama-3.1-8B-Instruct", PromptTokens: 1000, @@ -667,7 +747,7 @@ func TestRater_FineTunePricesViaBaseModelOnEvent(t *testing.T) { } // The applied rate frozen on the row is the derived (base × premium) rate. hour := mustTime("2026-06-08T10:00:00Z") - row := store.table[rollupKey{"a", "ft:9f8e7d6c5b4a", hour}] + row := store.table[rollupKey{"a", "r", "ft:9f8e7d6c5b4a", hour}] if row.appliedRate.Prompt.String() != "0.000006000" { t.Fatalf("ft applied prompt rate = %s, want 0.000006000 (0.000004 × 1.5)", row.appliedRate.Prompt) } @@ -685,7 +765,7 @@ func TestRater_FineTuneWithoutBaseModelFailsLoud(t *testing.T) { nil, PolicyMultiplier, MustDec("1.5"), Dec{}, ) events := []RatedEvent{{ - AuthID: "a", + AuthID: "a", ResourceID: "r", ModelID: "ft:9f8e7d6c5b4a", BaseModel: "", // THE BUG: base_model never propagated to the event PromptTokens: 1000, @@ -727,14 +807,14 @@ func TestRater_FineTuneAmbiguousBaseModelFailsLoud(t *testing.T) { at := mustTime("2026-06-08T10:15:00Z") events := []RatedEvent{ // SAME ft: model_id, TWO different base_models in the same window → ambiguous. - {AuthID: "a", ModelID: "ft:dupe", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, - {AuthID: "a", ModelID: "ft:dupe", BaseModel: "expensive/base", PromptTokens: 1000, At: at.Add(5 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "ft:dupe", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "ft:dupe", BaseModel: "expensive/base", PromptTokens: 1000, At: at.Add(5 * time.Minute)}, // THE LEGITIMATE CASE the gate must NOT trip: the same ft: id with the SAME base // across MULTIPLE events is one clean rollup (COUNT(DISTINCT base_model)=1), and // MUST still rate normally alongside the ambiguous one. Two events prove the gate // keys on DISTINCT bases, not on event count. - {AuthID: "a", ModelID: "ft:clean", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, - {AuthID: "a", ModelID: "ft:clean", BaseModel: "cheap/base", PromptTokens: 1000, At: at.Add(7 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "ft:clean", BaseModel: "cheap/base", PromptTokens: 1000, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "ft:clean", BaseModel: "cheap/base", PromptTokens: 1000, At: at.Add(7 * time.Minute)}, } store := newOracleStore(book, events) r := New(store, book, testLogger()) @@ -790,9 +870,9 @@ func TestRater_RollupCostSelfAudits(t *testing.T) { ) at := mustTime("2026-06-08T10:15:00Z") events := []RatedEvent{ - {AuthID: "a", ModelID: "base", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, - {AuthID: "a", ModelID: "base", PromptTokens: 200, CachedTokens: 0, CompletionTokens: 10, At: at.Add(10 * time.Minute)}, - {AuthID: "a", ModelID: "ft", PromptTokens: 100, CachedTokens: 0, CompletionTokens: 0, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "base", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "base", PromptTokens: 200, CachedTokens: 0, CompletionTokens: 10, At: at.Add(10 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "ft", PromptTokens: 100, CachedTokens: 0, CompletionTokens: 0, At: at}, } store := newOracleStore(book, events) r := New(store, book, testLogger()) @@ -824,8 +904,8 @@ func TestRater_AppliedRateStoredOnRow(t *testing.T) { ) at := mustTime("2026-06-08T10:15:00Z") events := []RatedEvent{ - {AuthID: "a", ModelID: "base", PromptTokens: 10, At: at}, - {AuthID: "a", ModelID: "ft", PromptTokens: 10, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "base", PromptTokens: 10, At: at}, + {AuthID: "a", ResourceID: "r", ModelID: "ft", PromptTokens: 10, At: at}, } store := newOracleStore(book, events) r := New(store, book, testLogger()) @@ -833,11 +913,11 @@ func TestRater_AppliedRateStoredOnRow(t *testing.T) { t.Fatal(err) } hour := mustTime("2026-06-08T10:00:00Z") - baseRow := store.table[rollupKey{"a", "base", hour}] + baseRow := store.table[rollupKey{"a", "r", "base", hour}] if baseRow.appliedRate.Prompt.String() != "0.000004000" { t.Fatalf("base applied prompt rate = %s, want 0.000004000", baseRow.appliedRate.Prompt) } - ftRow := store.table[rollupKey{"a", "ft", hour}] + ftRow := store.table[rollupKey{"a", "r", "ft", hour}] // premium applied: 0.000004 × 1.5 = 0.000006 if ftRow.appliedRate.Prompt.String() != "0.000006000" { t.Fatalf("ft applied prompt rate = %s, want 0.000006000 (premium frozen on row)", ftRow.appliedRate.Prompt) @@ -869,12 +949,12 @@ func TestOracleModel_SelfConsistent(t *testing.T) { ) hour := mustTime("2026-06-08T10:00:00Z") events := []RatedEvent{ - {AuthID: "a", ModelID: "b", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: hour.Add(5 * time.Minute)}, - {AuthID: "a", ModelID: "b", PromptTokens: 200, CachedTokens: 0, CompletionTokens: 10, At: hour.Add(40 * time.Minute)}, - {AuthID: "a", ModelID: "f", PromptTokens: 100, CachedTokens: 0, CompletionTokens: 0, At: hour.Add(15 * time.Minute)}, - {AuthID: "b", ModelID: "b", PromptTokens: 1000, CachedTokens: 1000, CompletionTokens: 0, At: hour.Add(20 * time.Minute)}, // all-cached - {AuthID: "a", ModelID: "unpriced", PromptTokens: 9, At: hour.Add(1 * time.Minute)}, // unpriced - {AuthID: "", ModelID: "b", PromptTokens: 9, At: hour.Add(2 * time.Minute)}, // unattributable + {AuthID: "a", ResourceID: "r", ModelID: "b", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: hour.Add(5 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "b", PromptTokens: 200, CachedTokens: 0, CompletionTokens: 10, At: hour.Add(40 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "f", PromptTokens: 100, CachedTokens: 0, CompletionTokens: 0, At: hour.Add(15 * time.Minute)}, + {AuthID: "b", ResourceID: "r", ModelID: "b", PromptTokens: 1000, CachedTokens: 1000, CompletionTokens: 0, At: hour.Add(20 * time.Minute)}, // all-cached + {AuthID: "a", ResourceID: "r", ModelID: "unpriced", PromptTokens: 9, At: hour.Add(1 * time.Minute)}, // unpriced + {AuthID: "", ModelID: "b", PromptTokens: 9, At: hour.Add(2 * time.Minute)}, // unattributable } store := newOracleStore(book, events) r := New(store, book, testLogger()) @@ -909,7 +989,7 @@ func TestOracleModel_SelfConsistent(t *testing.T) { t.Fatalf("rollups = %d, want %d", res.RollupsWritten, len(want)) } for k, w := range want { - rk := rollupKey{authID: k.auth, modelID: k.model, windowStart: hour} + rk := rollupKey{authID: k.auth, resourceID: "r", modelID: k.model, windowStart: hour} got := store.table[rk].cost if got.String() != w.String() { t.Errorf("rollup (%s,%s) cost = %s, oracle wants %s", k.auth, k.model, got, w) diff --git a/internal/rating/store.go b/internal/rating/store.go index e0daef0..9f6d133 100644 --- a/internal/rating/store.go +++ b/internal/rating/store.go @@ -21,7 +21,7 @@ import ( // store PROJECTS the book's already-premium-applied per-token rates into a // transient (TEMP) price table for the window, then rates the whole window in SQL — // resolves the effective rate, computes per-event cost, sums it per (auth_id, -// model_id, hour) into rated_usage idempotently, AND counts the fail-loud anomalies, +// resource_id, model_id, hour) into rated_usage idempotently, AND counts the fail-loud anomalies, // all in one snapshot so the rollups and the anomaly counts always agree on what // "priced" means. type Store interface { @@ -197,9 +197,9 @@ CREATE TEMP TABLE rating_derived ( // so rollup keys can never disagree across sessions and re-rates can't overlap. // // IDEMPOTENCY IS RECONCILE, NOT UPSERT-ONLY: a re-run of a window makes rated_usage -// EXACTLY what the latest run says. ON CONFLICT (auth_id, model_id, window_start) DO -// UPDATE replaces a surviving rollup's sums/cost/applied-rates with the freshly -// recomputed ones (the surrogate id is DETERMINISTIC — md5 of the LENGTH-PREFIXED +// EXACTLY what the latest run says. ON CONFLICT (auth_id, resource_id, model_id, +// window_start) DO UPDATE replaces a surviving rollup's sums/cost/applied-rates with +// the freshly recomputed ones (the surrogate id is DETERMINISTIC — md5 of the LENGTH-PREFIXED // natural key, injective, so no '|' in a field can collide two keys — so a re-run // regenerates the SAME id). AND the `deleted` CTE removes any in-window rated_usage // row this run did NOT reproduce in priced (it became ambiguous/unpriced, or its @@ -218,6 +218,11 @@ const rateWindowSQL = ` WITH ev AS ( SELECT auth_id, + -- resource_id: the deployment id (E2 customer attribution — billing resolves + -- the org via resource_id→org_id). It is part of the rated_usage grain: a NULL + -- resource_id row CANNOT name its deployment/org, so it is unattributable + -- (counted, never billed to a NULL org) — see the unattributable filter below. + resource_id, -- billing_event stores the engine-reported model NAME in its model column; -- that name IS phoebe's stable price key, model_id. A NULL model is -- unattributable. @@ -236,6 +241,7 @@ WITH ev AS ( resolved AS ( SELECT ev.auth_id, + ev.resource_id, ev.model_id, ev.base_model, ev.ev_ts, @@ -271,7 +277,8 @@ resolved AS ( -- fine-tune — never a literal 'ft:%' that could drift from the constant. AND ev.model_id LIKE $3 ), --- grouped: the per-(auth_id, model_id, hour) rollup BEFORE the ft-uniqueness gate. +-- grouped: the per-(auth_id, resource_id, model_id, hour) rollup BEFORE the +-- ft-uniqueness gate. -- It carries ambiguous_base = COUNT(DISTINCT base_model among DERIVED-priced rows) > 1. -- E3 mints ft: as a globally-unique uuid4, so one ft: model_id -- can NEVER legitimately carry two different base_models. If it does, the derived rates @@ -282,6 +289,7 @@ resolved AS ( grouped AS ( SELECT auth_id, + resource_id, model_id, -- Session-TZ-independent hour bucket; see the statement comment. date_trunc('hour', ev_ts AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS window_start, @@ -310,16 +318,20 @@ grouped AS ( COUNT(DISTINCT base_model) FILTER (WHERE via_derived) > 1 AS ambiguous_base FROM resolved WHERE prompt_price IS NOT NULL -- priced only - AND auth_id IS NOT NULL -- attributable only - AND model_id IS NOT NULL - GROUP BY auth_id, model_id, date_trunc('hour', ev_ts AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' + AND auth_id IS NOT NULL -- attributable only + AND model_id IS NOT NULL + -- resource_id is a NON-NULL key column AND the E2 customer-attribution key. A + -- NULL resource_id row can't name its deployment/org, so it is NEVER billed; it + -- is excluded here and COUNTED as unattributable below (fail closed). + AND resource_id IS NOT NULL + GROUP BY auth_id, resource_id, model_id, date_trunc('hour', ev_ts AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' ), priced AS ( SELECT * FROM grouped WHERE NOT ambiguous_base ), -- RECONCILE (re-rate deletes superseded rollups): a rated_usage row whose --- (auth_id, model_id, window_start) falls IN this run's window but is NOT in the --- current priced set is STALE — it billed CLEAN in a prior run, but the latest run +-- (auth_id, resource_id, model_id, window_start) falls IN this run's window but is +-- NOT in the current priced set is STALE — it billed CLEAN in a prior run, but the latest run -- now excludes it (it became ambiguous-base, or unpriced, or its events vanished). -- "What the latest run says is what bills," so it is DELETED, atomically with the -- upsert below, in the SAME snapshot. Without this, an upsert-only re-run leaves the @@ -340,6 +352,7 @@ deleted AS ( AND NOT EXISTS ( SELECT 1 FROM priced p WHERE p.auth_id = ru.auth_id + AND p.resource_id = ru.resource_id AND p.model_id = ru.model_id AND p.window_start = ru.window_start ) @@ -347,7 +360,7 @@ deleted AS ( ), upserted AS ( INSERT INTO rated_usage ( - id, auth_id, model_id, window_start, window_end, + id, auth_id, resource_id, model_id, window_start, window_end, prompt_tokens, cached_tokens, completion_tokens, billable_prompt_tokens, cost, applied_prompt_rate, applied_cached_rate, applied_completion_rate, event_count @@ -355,14 +368,17 @@ upserted AS ( SELECT -- DETERMINISTIC 32-char hex surrogate: md5 of the natural key, so re-rating -- regenerates the SAME id. The fields are LENGTH-PREFIXED (len || ':' || value) - -- so the encoding is INJECTIVE — a '|' inside auth_id or model_id can never - -- shift the boundary and collide two different keys onto one id (e.g. auth 'a|b' - -- + model 'c' vs auth 'a' + model 'b|c'). epoch (a bounded integer, no - -- separator hazard) keeps the hash input session-TZ-independent. + -- so the encoding is INJECTIVE — a '|' inside auth_id, resource_id or model_id + -- can never shift the boundary and collide two different keys onto one id (e.g. + -- auth 'a|b' + resource 'c' vs auth 'a' + resource 'b|c'). The field ORDER is + -- FIXED and MUST equal the unique key (auth_id, resource_id, model_id, + -- window_start); epoch (a bounded integer, no separator hazard) keeps the hash + -- input session-TZ-independent. md5(length(auth_id)::text || ':' || auth_id + || '|' || length(resource_id)::text || ':' || resource_id || '|' || length(model_id)::text || ':' || model_id || '|' || extract(epoch FROM window_start)::bigint::text), - auth_id, model_id, window_start, window_end, + auth_id, resource_id, model_id, window_start, window_end, prompt_tokens, cached_tokens, completion_tokens, billable_prompt_tokens, cost, applied_prompt_rate, applied_cached_rate, applied_completion_rate, event_count @@ -374,8 +390,8 @@ upserted AS ( -- raters over overlapping windows (Atlas CronJob concurrencyPolicy: Forbid), so -- the cross-rater hazard is unreachable and no delete-lock-ordering machinery is -- added here. See cmd/rater's package doc for the single-flight contract. - ORDER BY auth_id, model_id, window_start - ON CONFLICT (auth_id, model_id, window_start) DO UPDATE SET + ORDER BY auth_id, resource_id, model_id, window_start + ON CONFLICT (auth_id, resource_id, model_id, window_start) DO UPDATE SET window_end = EXCLUDED.window_end, prompt_tokens = EXCLUDED.prompt_tokens, cached_tokens = EXCLUDED.cached_tokens, @@ -401,12 +417,16 @@ SELECT -- counted ONLY as unattributable (the more specific signal), never also as -- unpriced, so the counts partition the in-window rows: -- events_rated + unpriced + unattributable + ambiguous_base == total in-window events. + -- The unpriced count requires FULL attribution (auth_id, resource_id, model_id all + -- NON-NULL) for exactly this exclusivity: a NULL-resource_id row that is also + -- unpriced must be counted ONLY as unattributable, never double-counted here. (SELECT COUNT(*)::bigint FROM resolved - WHERE prompt_price IS NULL - AND auth_id IS NOT NULL - AND model_id IS NOT NULL) AS unpriced_events, + WHERE prompt_price IS NULL + AND auth_id IS NOT NULL + AND resource_id IS NOT NULL + AND model_id IS NOT NULL) AS unpriced_events, (SELECT COUNT(*)::bigint FROM ev - WHERE auth_id IS NULL OR model_id IS NULL) AS unattributable_events, + WHERE auth_id IS NULL OR resource_id IS NULL OR model_id IS NULL) AS unattributable_events, -- AMBIGUOUS-BASE events: the EVENT count under ambiguous rollups (a single ft: -- model_id resolving through >1 distinct base_model in one window — the E3 -- ft-uniqueness violation). These rollups are NOT upserted (excluded from priced), diff --git a/internal/rating/store_integration_test.go b/internal/rating/store_integration_test.go index 0cf7a62..79db256 100644 --- a/internal/rating/store_integration_test.go +++ b/internal/rating/store_integration_test.go @@ -34,6 +34,9 @@ const schemaDDL = ` CREATE TABLE billing_event ( request_id VARCHAR(255) PRIMARY KEY, auth_id VARCHAR(64), + -- resource_id (the deployment id) is NULLABLE here, mirroring migration 0001: the + -- rater fails closed on a NULL (counts it unattributable), it does not reject it. + resource_id VARCHAR(64), model VARCHAR(255), base_model VARCHAR(255), prompt_tokens INTEGER NOT NULL DEFAULT 0, @@ -49,6 +52,7 @@ CREATE INDEX billing_event_rating_instant_ix CREATE TABLE rated_usage ( id VARCHAR(32) PRIMARY KEY, auth_id VARCHAR(64) NOT NULL, + resource_id VARCHAR(64) NOT NULL, model_id VARCHAR(255) NOT NULL, window_start TIMESTAMPTZ NOT NULL, window_end TIMESTAMPTZ NOT NULL, @@ -62,13 +66,15 @@ CREATE TABLE rated_usage ( applied_completion_rate NUMERIC(20,9) NOT NULL DEFAULT 0, event_count BIGINT NOT NULL, rated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT rated_usage_auth_model_window_uq UNIQUE (auth_id, model_id, window_start) + CONSTRAINT rated_usage_auth_resource_model_window_uq UNIQUE (auth_id, resource_id, model_id, window_start) ); -- Mirror the production indexes (migrations/0002_rating.sql): the auth-leading index --- for billing queries, and the window_start-leading index the reconcile DELETE needs --- (it filters window_start alone; every auth-leading index leaves it trailing). +-- for billing queries, the resource_id-leading index for E2 per-deployment reads, and +-- the window_start-leading index the reconcile DELETE needs (it filters window_start +-- alone; every auth/resource-leading index leaves it trailing). CREATE INDEX rated_usage_auth_id_window_start_ix ON rated_usage (auth_id, window_start); +CREATE INDEX rated_usage_resource_id_window_start_ix ON rated_usage (resource_id, window_start); CREATE INDEX rated_usage_window_start_ix ON rated_usage (window_start);` // conformanceBook is the fixture price book shared by the conformance tests: base @@ -103,18 +109,19 @@ func TestIntegration_RateWindow_ConformsToOracle(t *testing.T) { hour := mustTime("2026-06-08T10:00:00Z") book := conformanceBook() - // Events: priced base, priced derived, unpriced, unattributable. + // Events: priced base, priced derived, unpriced, unattributable. Each priced/unpriced + // event carries a resource_id (E2 grain); the unattributable one has none. events := []RatedEvent{ - {AuthID: "a", ModelID: "b", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: hour.Add(5 * time.Minute)}, - {AuthID: "a", ModelID: "f", PromptTokens: 100, CachedTokens: 0, CompletionTokens: 0, At: hour.Add(15 * time.Minute)}, - {AuthID: "a", ModelID: "unpriced", PromptTokens: 9, At: hour.Add(1 * time.Minute)}, - {AuthID: "", ModelID: "b", PromptTokens: 9, At: hour.Add(2 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "b", PromptTokens: 100, CachedTokens: 30, CompletionTokens: 50, At: hour.Add(5 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "f", PromptTokens: 100, CachedTokens: 0, CompletionTokens: 0, At: hour.Add(15 * time.Minute)}, + {AuthID: "a", ResourceID: "r", ModelID: "unpriced", PromptTokens: 9, At: hour.Add(1 * time.Minute)}, + {AuthID: "", ResourceID: "r", ModelID: "b", PromptTokens: 9, At: hour.Add(2 * time.Minute)}, } for i, e := range events { _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) - VALUES ($1,$2,$3,$4,$5,$6,$7)`, - fmt.Sprintf("req-%d", i), nullableStr(e.AuthID), nullableStr(e.ModelID), + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8)`, + fmt.Sprintf("req-%d", i), nullableStr(e.AuthID), nullableStr(e.ResourceID), nullableStr(e.ModelID), e.PromptTokens, e.CachedTokens, e.CompletionTokens, e.At) if err != nil { t.Fatalf("seed event %d: %v", i, err) @@ -177,6 +184,18 @@ func TestIntegration_RateWindow_ConformsToOracle(t *testing.T) { } } + // E2 ATTRIBUTION: every written rollup carries the event's resource_id (the + // deployment id billing resolves the org from). The two priced rollups were seeded + // with resource_id 'r'. + var nWithResource, nTotal int + if err := db.QueryRowContext(ctx, `SELECT COUNT(*) FILTER (WHERE resource_id = 'r'), COUNT(*) FROM rated_usage`). + Scan(&nWithResource, &nTotal); err != nil { + t.Fatalf("read resource_id: %v", err) + } + if nWithResource != nTotal || nTotal != 2 { + t.Fatalf("rated_usage resource_id: %d of %d rows carry 'r', want all 2 (E2 attribution must be on every row)", nWithResource, nTotal) + } + // Deterministic surrogate ids: capture before the re-run. idsBefore := readRatedUsageIDs(t, db) @@ -200,10 +219,97 @@ func TestIntegration_RateWindow_ConformsToOracle(t *testing.T) { } } +// TestIntegration_ResourceIDGrainAndFailClosed runs the REAL SQL to pin the E2 +// resource_id grain in Postgres: +// - TWO deployments (distinct resource_id) of the SAME model by the SAME auth in the +// SAME hour produce TWO distinct rated_usage rows (NOT one summed row) — they may +// bill to different orgs, so collapsing them would mis-attribute revenue; +// - a NULL-resource_id event is UNATTRIBUTABLE: counted, never written (a row that +// can't name its deployment/org must never be billed). This is the live-Postgres +// proof of the fail-closed attribution partition. +func TestIntegration_ResourceIDGrainAndFailClosed(t *testing.T) { + dsn := os.Getenv("PHOEBE_TEST_DATABASE_URL") + if dsn == "" { + t.Skip("PHOEBE_TEST_DATABASE_URL not set; skipping live-Postgres conformance") + } + ctx := context.Background() + db, err := sql.Open("pgx", dsn) + if err != nil { + t.Fatalf("open: %v", err) + } + defer db.Close() + + const sch = "phoebe_rating_resource_it" + exec(t, db, "DROP SCHEMA IF EXISTS "+sch+" CASCADE") + exec(t, db, "CREATE SCHEMA "+sch) + exec(t, db, "SET search_path TO "+sch) + defer func() { exec(t, db, "DROP SCHEMA IF EXISTS "+sch+" CASCADE") }() + exec(t, db, schemaDDL) + + hour := mustTime("2026-06-08T10:00:00Z") + book := newTestBook( + map[string]Rate3{"b": rate3("0.000005", "0", "0")}, + nil, PolicyIdentity, Dec{}, Dec{}, + ) + + // Two deployments of model "b" by auth "a" in the same hour, plus a NULL-resource_id + // event that must fail closed. + if _, err := db.ExecContext(ctx, + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, prompt_tokens, completion_tokens, event_ts) + VALUES ('d1','a','deploy-1','b',100,0,$1), + ('d2','a','deploy-2','b',100,0,$1), + ('dnull','a',NULL,'b',100,0,$1)`, hour.Add(5*time.Minute)); err != nil { + t.Fatalf("seed: %v", err) + } + + store := NewPostgresStore(db) + res, err := store.RateWindow(ctx, book, hour, hour.Add(time.Hour)) + if err != nil { + t.Fatalf("RateWindow: %v", err) + } + + // Two deployments → two rollups; the NULL-resource_id event is unattributable. + if res.RollupsWritten != 2 || res.EventsRated != 2 { + t.Fatalf("rollups/events = %d/%d, want 2/2 (distinct deployments bill separately)", res.RollupsWritten, res.EventsRated) + } + if res.UnattributableEvents != 1 { + t.Fatalf("unattributable = %d, want 1 (the NULL-resource_id event must be counted, never billed)", res.UnattributableEvents) + } + // PARTITION holds with resource_id in the mix. + if got := res.EventsRated + res.UnpricedEvents + res.UnattributableEvents + res.AmbiguousBaseEvents; got != 3 { + t.Fatalf("rated+unpriced+unattr+ambiguous = %d, want 3 (all seeded events)", got) + } + + // Exactly two rows, one per deployment; NONE with a NULL/empty resource_id. + var nRows, nNull int + if err := db.QueryRowContext(ctx, + `SELECT COUNT(*), COUNT(*) FILTER (WHERE resource_id IS NULL OR resource_id = '') FROM rated_usage`). + Scan(&nRows, &nNull); err != nil { + t.Fatalf("count rows: %v", err) + } + if nRows != 2 { + t.Fatalf("rated_usage rows = %d, want 2 (one per deployment)", nRows) + } + if nNull != 0 { + t.Fatalf("rated_usage has %d NULL/empty-resource_id rows — a row that can't name its deployment/org must NEVER be written", nNull) + } + for _, rid := range []string{"deploy-1", "deploy-2"} { + var n int + if err := db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM rated_usage WHERE auth_id='a' AND resource_id=$1 AND model_id='b' AND window_start=$2`, + rid, hour).Scan(&n); err != nil { + t.Fatalf("count %s: %v", rid, err) + } + if n != 1 { + t.Fatalf("deployment %s rollups = %d, want exactly 1", rid, n) + } + } +} + // readRatedUsageIDs returns natural-key → id for every rated_usage row. func readRatedUsageIDs(t *testing.T, db *sql.DB) map[string]string { t.Helper() - rows, err := db.Query(`SELECT auth_id || '|' || model_id || '|' || extract(epoch FROM window_start)::bigint::text, id FROM rated_usage`) + rows, err := db.Query(`SELECT auth_id || '|' || resource_id || '|' || model_id || '|' || extract(epoch FROM window_start)::bigint::text, id FROM rated_usage`) if err != nil { t.Fatalf("read rated_usage ids: %v", err) } @@ -271,8 +377,8 @@ func TestConformance_PremiumQuantizedBeforeBilling(t *testing.T) { } for i, e := range events { if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) - VALUES ($1,$2,$3,$4,$5,$6,$7)`, + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) + VALUES ($1,$2,'r',$3,$4,$5,$6,$7)`, fmt.Sprintf("q-req-%d", i), e.AuthID, e.ModelID, e.PromptTokens, e.CachedTokens, e.CompletionTokens, e.At); err != nil { t.Fatalf("seed event %d: %v", i, err) @@ -361,8 +467,8 @@ func TestConformance_OracleQuantizesBeforeMultiply_OnResidue(t *testing.T) { } for i, e := range events { if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) - VALUES ($1,$2,$3,$4,$5,$6,$7)`, + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) + VALUES ($1,$2,'r',$3,$4,$5,$6,$7)`, fmt.Sprintf("res-req-%d", i), e.AuthID, e.ModelID, e.PromptTokens, e.CachedTokens, e.CompletionTokens, e.At); err != nil { t.Fatalf("seed event %d: %v", i, err) @@ -458,8 +564,8 @@ func TestIntegration_FineTunePricesViaBaseModel(t *testing.T) { } for _, s := range seeds { if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, base_model, prompt_tokens, completion_tokens, event_ts) - VALUES ($1,'a',$2,$3,$4,0,$5)`, + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, base_model, prompt_tokens, completion_tokens, event_ts) + VALUES ($1,'a','r',$2,$3,$4,0,$5)`, s.req, s.model, nullableStr(s.baseModel), s.prompt, hour.Add(5*time.Minute)); err != nil { t.Fatalf("seed %s: %v", s.req, err) } @@ -553,8 +659,8 @@ func TestIntegration_FineTuneAmbiguousBaseModelFailsLoud(t *testing.T) { } for _, s := range seeds { if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, base_model, prompt_tokens, completion_tokens, event_ts) - VALUES ($1,'a',$2,$3,$4,0,$5)`, + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, base_model, prompt_tokens, completion_tokens, event_ts) + VALUES ($1,'a','r',$2,$3,$4,0,$5)`, s.req, s.model, nullableStr(s.baseModel), s.prompt, hour.Add(5*time.Minute)); err != nil { t.Fatalf("seed %s: %v", s.req, err) } @@ -632,9 +738,9 @@ func TestIntegration_ReRateReconciles(t *testing.T) { // Run A: a single-base ft: rollup (ft:dupe) PLUS a co-window survivor (ft:keep). // Both clean → billed. ft:keep must survive run B's reconcile untouched. if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, base_model, prompt_tokens, completion_tokens, event_ts) - VALUES ('rc-1','a','ft:dupe','cheap/base',1000,0,$1), - ('rc-keep','a','ft:keep','cheap/base',1000,0,$1)`, hour.Add(5*time.Minute)); err != nil { + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, base_model, prompt_tokens, completion_tokens, event_ts) + VALUES ('rc-1','a','r','ft:dupe','cheap/base',1000,0,$1), + ('rc-keep','a','r','ft:keep','cheap/base',1000,0,$1)`, hour.Add(5*time.Minute)); err != nil { t.Fatalf("seed rc-1/rc-keep: %v", err) } resA, err := store.RateWindow(ctx, book, hour, hour.Add(time.Hour)) @@ -654,8 +760,8 @@ func TestIntegration_ReRateReconciles(t *testing.T) { // Mutate: a SECOND base_model for the SAME ft: id → ambiguous. if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, base_model, prompt_tokens, completion_tokens, event_ts) - VALUES ('rc-2','a','ft:dupe','expensive/base',1000,0,$1)`, hour.Add(10*time.Minute)); err != nil { + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, base_model, prompt_tokens, completion_tokens, event_ts) + VALUES ('rc-2','a','r','ft:dupe','expensive/base',1000,0,$1)`, hour.Add(10*time.Minute)); err != nil { t.Fatalf("seed rc-2: %v", err) } resB, err := store.RateWindow(ctx, book, hour, hour.Add(time.Hour)) @@ -729,8 +835,8 @@ func TestIntegration_ReRateReconcileLeavesOtherWindowsUntouched(t *testing.T) { // Seed a clean rollup in BOTH the 10:00 and 12:00 hours; rate each window. if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, prompt_tokens, completion_tokens, event_ts) - VALUES ('w10','a','b',100,0,$1), ('w12','a','b',100,0,$2)`, + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, prompt_tokens, completion_tokens, event_ts) + VALUES ('w10','a','r','b',100,0,$1), ('w12','a','r','b',100,0,$2)`, hour10.Add(5*time.Minute), hour12.Add(5*time.Minute)); err != nil { t.Fatalf("seed windows: %v", err) } @@ -814,8 +920,8 @@ func TestIntegration_OneHopFineTuneCannotDeriveFromFineTune(t *testing.T) { } for _, s := range seeds { if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, base_model, prompt_tokens, completion_tokens, event_ts) - VALUES ($1,'a',$2,$3,1000,0,$4)`, + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, base_model, prompt_tokens, completion_tokens, event_ts) + VALUES ($1,'a','r',$2,$3,1000,0,$4)`, s.req, s.model, s.baseModel, hour.Add(5*time.Minute)); err != nil { t.Fatalf("seed %s: %v", s.req, err) } @@ -870,8 +976,8 @@ func TestIntegration_UTCBucketing_SessionTZIndependent(t *testing.T) { nil, PolicyIdentity, Dec{}, Dec{}, ) if _, err := db.ExecContext(ctx, - `INSERT INTO billing_event (request_id, auth_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) - VALUES ('tz-req-0','a','b',100,0,50,$1)`, hour.Add(30*time.Minute)); err != nil { + `INSERT INTO billing_event (request_id, auth_id, resource_id, model, prompt_tokens, cached_tokens, completion_tokens, event_ts) + VALUES ('tz-req-0','a','r','b',100,0,50,$1)`, hour.Add(30*time.Minute)); err != nil { t.Fatalf("seed event: %v", err) } @@ -991,12 +1097,13 @@ func TestIntegration_ReconcileDeleteCanUseWindowStartIndex(t *testing.T) { // only prove the index CAN serve the predicate under seqscan-off, not that the // planner prefers it at default cost, so a large population would back nothing. exec(t, db, `INSERT INTO rated_usage - (id, auth_id, model_id, window_start, window_end, + (id, auth_id, resource_id, model_id, window_start, window_end, prompt_tokens, cached_tokens, completion_tokens, billable_prompt_tokens, cost, applied_prompt_rate, applied_cached_rate, applied_completion_rate, event_count) SELECT md5(a::text || ':' || h::text), 'auth' || a::text, + 'deploy' || a::text, 'm', '2026-01-01T00:00:00Z'::timestamptz + (h || ' hours')::interval, '2026-01-01T01:00:00Z'::timestamptz + (h || ' hours')::interval, @@ -1013,7 +1120,7 @@ func TestIntegration_ReconcileDeleteCanUseWindowStartIndex(t *testing.T) { // must still go through the index rather than seqscanning the whole table. const reconcileDelete = `EXPLAIN WITH priced AS ( - SELECT auth_id, model_id, window_start FROM rated_usage WHERE false + SELECT auth_id, resource_id, model_id, window_start FROM rated_usage WHERE false ) DELETE FROM rated_usage ru WHERE ru.window_start >= '2026-01-01T04:00:00Z' @@ -1021,6 +1128,7 @@ func TestIntegration_ReconcileDeleteCanUseWindowStartIndex(t *testing.T) { AND NOT EXISTS ( SELECT 1 FROM priced p WHERE p.auth_id = ru.auth_id + AND p.resource_id = ru.resource_id AND p.model_id = ru.model_id AND p.window_start = ru.window_start )` diff --git a/internal/rating/store_test.go b/internal/rating/store_test.go index 924aef3..a055d9b 100644 --- a/internal/rating/store_test.go +++ b/internal/rating/store_test.go @@ -110,20 +110,24 @@ func TestRateWindowSQL_Shape(t *testing.T) { "applied_prompt_rate", "applied_cached_rate", "applied_completion_rate", - // priced + attributable filter (never $0-bill unpriced/unattributable) + // priced + attributable filter (never $0-bill unpriced/unattributable). A NULL + // resource_id can't name the deployment/org (E2) → excluded + counted, never billed. "WHERE prompt_price IS NOT NULL", - "AND auth_id IS NOT NULL", - "AND model_id IS NOT NULL", + "AND auth_id IS NOT NULL", + "AND model_id IS NOT NULL", + "AND resource_id IS NOT NULL", // session-TZ-independent hour bucket "date_trunc('hour', ev_ts AT TIME ZONE 'UTC') AT TIME ZONE 'UTC'", // deterministic natural-key surrogate id (re-runs regenerate the same id), - // LENGTH-PREFIXED so a '|' in a field can never collide two keys + // LENGTH-PREFIXED so a '|' in a field can never collide two keys; resource_id is + // part of the key, in fixed order (auth_id, resource_id, model_id, window_start) "md5(length(auth_id)::text || ':' || auth_id", + "|| '|' || length(resource_id)::text || ':' || resource_id", "|| '|' || length(model_id)::text || ':' || model_id", // deterministic lock order across concurrent raters (no ABBA deadlock) - "ORDER BY auth_id, model_id, window_start", + "ORDER BY auth_id, resource_id, model_id, window_start", // idempotent upsert on the natural key - "ON CONFLICT (auth_id, model_id, window_start) DO UPDATE SET", + "ON CONFLICT (auth_id, resource_id, model_id, window_start) DO UPDATE SET", // RE-RATE RECONCILES (FIX 2): the `deleted` CTE removes any in-window rollup this // run did NOT reproduce in priced, atomically with the upsert — so a superseded // rollup cannot keep billing at its stale cost. Window-scoped + NOT EXISTS priced. @@ -132,10 +136,15 @@ func TestRateWindowSQL_Shape(t *testing.T) { "ru.window_start < $2", "NOT EXISTS (", "FROM priced p", + // the reconcile anti-join keys on the FULL grain incl. resource_id, so it matches + // the new unique constraint exactly (a deployment that fell out is reconciled) + "p.resource_id = ru.resource_id", "AS reconciled_deletions", - // the anomaly counts ride the SAME statement (one snapshot) as the upsert + // the anomaly counts ride the SAME statement (one snapshot) as the upsert; a NULL + // resource_id is counted UNATTRIBUTABLE (fail closed), never billed "AS unpriced_events", - "AS unattributable_events", + "OR resource_id IS NULL OR model_id IS NULL) AS unattributable_events", + "AND resource_id IS NOT NULL", // the E3 ft-uniqueness gate: an ft: rollup spanning >1 base_model is split out "COUNT(DISTINCT base_model) FILTER (WHERE via_derived) > 1 AS ambiguous_base", "WHERE NOT ambiguous_base", diff --git a/migrations/0002_rating.sql b/migrations/0002_rating.sql index 4a3d80b..7ae4eeb 100644 --- a/migrations/0002_rating.sql +++ b/migrations/0002_rating.sql @@ -10,9 +10,12 @@ -- each rated_usage row. See config/prices.example.yaml and internal/rating. -- -- ONE table here: --- rated_usage — the rollup. Per-(auth_id, model_id, hour) cost, idempotently --- upserted, carrying the applied per-token rates so the row is --- self-auditing and immutable ("we never reprice served traffic"). +-- rated_usage — the rollup. Per-(auth_id, resource_id, model_id, hour) cost, +-- idempotently upserted, carrying the applied per-token rates so the +-- row is self-auditing and immutable ("we never reprice served +-- traffic"). resource_id (the deployment id) is part of the grain for +-- E2 customer attribution: billing resolves the org via +-- resource_id→org_id. -- -- MONEY UNIT (read before touching any number here): -- All money is stored as NUMERIC(20,9) — exact base-10 decimal, 9 fractional @@ -31,16 +34,29 @@ -- migrations/atlas/c2f1a3b4d5e6_add_rating.py and migrations/README.md. Keep the -- two in sync. --- rated_usage: the per-(auth_id, model_id, hour) cost rollup. +-- rated_usage: the per-(auth_id, resource_id, model_id, hour) cost rollup. -- -- Grain is HOURLY (window_start truncated to the hour), matching Atlas's --- hourly_usage_record. The natural key (auth_id, model_id, window_start) is UNIQUE --- so re-rating a window UPSERTS the same rows instead of duplicating them — re-runs --- are idempotent and never double-count. +-- hourly_usage_record. The natural key (auth_id, resource_id, model_id, window_start) +-- is UNIQUE so re-rating a window UPSERTS the same rows instead of duplicating them — +-- re-runs are idempotent and never double-count. +-- +-- resource_id IS PART OF THE GRAIN, NOT REDUNDANT WITH model_id (E2 customer +-- attribution): billing identifies the customer org via resource_id→org_id. model_id +-- stays because price resolution is per-MODEL — one deployment can serve multiple +-- models/adapters at different rates, and collapsing them would sum traffic priced +-- differently. Two deployments of the SAME model by the SAME auth in one hour are +-- TWO rows (correct — they may bill to different orgs). resource_id is NON-NULL here +-- though NULLABLE on billing_event: a row that can't name its deployment/org cannot +-- be billed, so the rater EXCLUDES a NULL-resource_id event from this rollup and +-- COUNTS it as unattributable (fail closed) — see internal/rating/store.go. CREATE TABLE rated_usage ( id VARCHAR(32) NOT NULL, -- surrogate; natural key is the unique constraint below auth_id VARCHAR(64) NOT NULL, + -- resource_id: the deployment id (E2 customer attribution). NON-NULL — the rater + -- never writes a row it can't attribute to a deployment/org. + resource_id VARCHAR(64) NOT NULL, model_id VARCHAR(255) NOT NULL, -- [window_start, window_end) is the hour this rollup covers. @@ -79,15 +95,22 @@ CREATE TABLE rated_usage ( rated_at TIMESTAMPTZ NOT NULL DEFAULT now(), CONSTRAINT pk_rated_usage PRIMARY KEY (id), - -- Idempotency key: one rollup row per (auth_id, model_id, hour). - CONSTRAINT rated_usage_auth_model_window_uq - UNIQUE (auth_id, model_id, window_start) + -- Idempotency key: one rollup row per (auth_id, resource_id, model_id, hour). + CONSTRAINT rated_usage_auth_resource_model_window_uq + UNIQUE (auth_id, resource_id, model_id, window_start) ); -- Per-API-key billing queries scan by auth_id over a time window. CREATE INDEX rated_usage_auth_id_window_start_ix ON rated_usage (auth_id, window_start); +-- E2 customer attribution reads rated_usage by DEPLOYMENT over a time window (resolve +-- the org via resource_id→org_id, then sum that deployment's cost). resource_id leads +-- so a per-deployment time-range query is a tight index slice rather than a scan over +-- an auth-leading index where resource_id is only a trailing column. +CREATE INDEX rated_usage_resource_id_window_start_ix + ON rated_usage (resource_id, window_start); + -- The reconcile DELETE (re-rate convergence, the `deleted` CTE in -- internal/rating/store.go) filters rated_usage on window_start ALONE -- (window_start >= $1 AND window_start < $2), then anti-joins priced. Every other diff --git a/migrations/README.md b/migrations/README.md index 2de3bb0..66126cc 100644 --- a/migrations/README.md +++ b/migrations/README.md @@ -5,8 +5,12 @@ This directory holds the schema for phoebe's billing tables, which live in the - `billing_event` — the system-of-record table that phoebe's Postgres drainer (`cmd/drainer`) writes raw, pre-rating metering records into. -- `rated_usage` — the **rating (E1)** (revenue) rollup: per-(auth_id, model_id, - hour) cost, carrying the applied per-token rates frozen onto each row. **Money is +- `rated_usage` — the **rating (E1)** (revenue) rollup: per-(auth_id, resource_id, + model_id, hour) cost, carrying the applied per-token rates frozen onto each row. + `resource_id` (the deployment id) is part of the grain for **E2 customer + attribution** (billing resolves the org via `resource_id`→`org_id`); a + NULL-`resource_id` event fails closed (counted unattributable, never billed). + **Money is stored as `NUMERIC(20,9)` — exact decimal, never float and never an integer micro/nano scalar — and ALL money math happens in SQL, not Go.** diff --git a/migrations/atlas/c2f1a3b4d5e6_add_rating.py b/migrations/atlas/c2f1a3b4d5e6_add_rating.py index f06a8ed..7db004c 100644 --- a/migrations/atlas/c2f1a3b4d5e6_add_rating.py +++ b/migrations/atlas/c2f1a3b4d5e6_add_rating.py @@ -21,10 +21,14 @@ micro/nano scalar. The rater projects the YAML prices into a transient TEMP table and computes AND sums per-event cost in one SQL statement; Go never holds a money total. The fine-tune premium is applied in exact decimal at projection. - * rated_usage is the per-(auth_id, model_id, hour) rollup, idempotently upserted on - the natural key. It carries the APPLIED per-token rates (applied_prompt_rate / - applied_cached_rate / applied_completion_rate) so the row is self-auditing and - immutable: "we never reprice traffic you've already served" holds by construction. + * rated_usage is the per-(auth_id, resource_id, model_id, hour) rollup, idempotently + upserted on the natural key. It carries the APPLIED per-token rates + (applied_prompt_rate / applied_cached_rate / applied_completion_rate) so the row is + self-auditing and immutable: "we never reprice traffic you've already served" holds + by construction. resource_id (the deployment id) is part of the grain for E2 + customer attribution — billing resolves the org via resource_id→org_id. It is + NON-NULL here though NULLABLE on billing_event: the rater fails closed, excluding a + NULL-resource_id event from the rollup and counting it as unattributable. CLEAN REWRITE (no prod data): an earlier draft of this revision created model_price + derivation_policy. E1 moved prices to a YAML file, so those tables are gone and @@ -49,11 +53,16 @@ def upgrade(): - # --- rated_usage: the per-(auth_id, model_id, hour) cost rollup --- + # --- rated_usage: the per-(auth_id, resource_id, model_id, hour) cost rollup --- + # resource_id (the deployment id) is part of the grain for E2 customer attribution + # (billing resolves the org via resource_id→org_id). NON-NULL here though NULLABLE + # on billing_event: the rater fails closed — a NULL-resource_id event is excluded + # from the rollup and counted as unattributable, never billed to a NULL org. op.create_table( "rated_usage", sa.Column("id", sa.Unicode(length=32), nullable=False), sa.Column("auth_id", sa.Unicode(length=64), nullable=False), + sa.Column("resource_id", sa.Unicode(length=64), nullable=False), sa.Column("model_id", sa.Unicode(length=255), nullable=False), sa.Column("window_start", sa.DateTime(timezone=True), nullable=False), sa.Column("window_end", sa.DateTime(timezone=True), nullable=False), @@ -81,12 +90,13 @@ def upgrade(): server_default=sa.func.now(), ), sa.PrimaryKeyConstraint("id", name=op.f("pk_rated_usage")), - # Idempotency key: one rollup row per (auth_id, model_id, hour). + # Idempotency key: one rollup row per (auth_id, resource_id, model_id, hour). sa.UniqueConstraint( "auth_id", + "resource_id", "model_id", "window_start", - name=op.f("rated_usage_auth_model_window_uq"), + name=op.f("rated_usage_auth_resource_model_window_uq"), ), ) op.create_index( @@ -96,6 +106,18 @@ def upgrade(): unique=False, ) + # E2 customer attribution reads rated_usage by DEPLOYMENT over a time window + # (resolve the org via resource_id→org_id, then sum that deployment's cost). A + # resource_id-leading index makes a per-deployment time-range query a tight slice + # rather than a scan over the auth-leading index where resource_id only trails. + # Mirrors migrations/0002_rating.sql. + op.create_index( + "rated_usage_resource_id_window_start_ix", + "rated_usage", + ["resource_id", "window_start"], + unique=False, + ) + # The reconcile DELETE (re-rate convergence, the `deleted` CTE in # internal/rating/store.go) filters rated_usage on window_start ALONE # (window_start >= $1 AND window_start < $2), then anti-joins priced. Every other @@ -156,6 +178,11 @@ def downgrade(): op.drop_index( "rated_usage_window_start_ix", table_name="rated_usage", if_exists=True ) + op.drop_index( + "rated_usage_resource_id_window_start_ix", + table_name="rated_usage", + if_exists=True, + ) op.drop_index( "rated_usage_auth_id_window_start_ix", table_name="rated_usage", if_exists=True )