Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/rater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// premium policy, and per-GPU floor rates — reads a window of raw metering rows from
// billing_event, projects the prices into a transient TEMP table, and ENTIRELY IN
// SQL computes per-event cost as exact NUMERIC, sums it, and upserts
// per-(auth_id, model_id, hour) cost rollups into rated_usage (with the applied
// per-(auth_id, resource_id, model_id, hour) cost rollups into rated_usage (with the applied
// per-token rates frozen onto each row). No money math happens in Go — the rater
// binary is orchestration only (the fine-tune premium is applied in exact decimal
// when the prices are projected, then handed to SQL as NUMERIC).
Expand Down Expand Up @@ -66,8 +66,8 @@
// last hour: events can be DRAINED LATE (a Valkey outage recovered from the WAL)
// with an event_ts in an hour that was already rated, and a last-hour-only cron
// would never revisit that hour — silent revenue loss. Re-rating closed hours is
// safe BY CONSTRUCTION: the upsert REPLACES each (auth_id, model_id, hour) bucket
// with a freshly recomputed total, so the late event is folded in and nothing
// safe BY CONSTRUCTION: the upsert REPLACES each (auth_id, resource_id, model_id, hour)
// bucket with a freshly recomputed total, so the late event is folded in and nothing
// doubles. RESIDUAL RISK, stated honestly: an event arriving MORE than N hours
// late still slips past the default window (the DESIGN.md reconciliation backstop
// is the eventual answer; until then, widen rateTrailingHours or re-rate the hour
Expand Down Expand Up @@ -228,7 +228,7 @@ func run() int {
// re-rates the last N closed hours so an event DRAINED LATE into an already-rated
// hour (Valkey outage → WAL recovery) is picked up by a later run instead of being
// lost forever. Re-rating a closed hour is safe by construction: the upsert
// REPLACES each (auth_id, model_id, hour) bucket with a recomputed total, so
// REPLACES each (auth_id, resource_id, model_id, hour) bucket with a recomputed total, so
// re-runs reconcile and never double-count. An event arriving more than N hours
// late still slips (residual risk; see the package doc — reconciliation is the
// backstop). Either flag may be given explicitly (RFC3339) and WINS over the
Expand Down
12 changes: 9 additions & 3 deletions internal/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func TestE2E_StreamedRequestBecomesMoney(t *testing.T) {

var (
nRollups int
ruAuthID, ruModelID, cost string
ruAuthID, ruResourceID, ruModelID, cost string
ruPrompt, ruCached, ruCompletion, ruBillable int64
eventCount int64 // BIGINT column
)
Expand All @@ -480,14 +480,20 @@ func TestE2E_StreamedRequestBecomesMoney(t *testing.T) {
t.Fatalf("rated_usage rows = %d, want exactly 1", nRollups)
}
if err := h.db.QueryRow(
`SELECT auth_id, model_id, prompt_tokens, cached_tokens, completion_tokens, billable_prompt_tokens, cost::text, event_count
`SELECT auth_id, resource_id, model_id, prompt_tokens, cached_tokens, completion_tokens, billable_prompt_tokens, cost::text, event_count
FROM rated_usage`).
Scan(&ruAuthID, &ruModelID, &ruPrompt, &ruCached, &ruCompletion, &ruBillable, &cost, &eventCount); err != nil {
Scan(&ruAuthID, &ruResourceID, &ruModelID, &ruPrompt, &ruCached, &ruCompletion, &ruBillable, &cost, &eventCount); err != nil {
t.Fatalf("read rated_usage: %v", err)
}
if ruAuthID != testAuthID {
t.Errorf("rated_usage.auth_id = %q, want %q (the X-Saturn-Auth-Id header value)", ruAuthID, testAuthID)
}
// E2 customer attribution: the rollup carries the deployment id the request routed
// to (X-Saturn-Resource-Id), end-to-end from the proxy through billing_event into
// the rated_usage grain — this is the key billing resolves the org from.
if ruResourceID != testResourceID {
t.Errorf("rated_usage.resource_id = %q, want %q (the X-Saturn-Resource-Id header value, for E2 org attribution)", ruResourceID, testResourceID)
}
// THE deployment-id-bug guard, at the far end of the pipe: the money is
// keyed on the engine name the upstream reported, not the id we routed on.
if ruModelID != testModelName {
Expand Down
2 changes: 1 addition & 1 deletion internal/rating/doc.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Package rating is phoebe's REVENUE path. It turns the raw token counts in
// billing_event into money: per (auth_id, model_id, hour) cost rollups in
// billing_event into money: per (auth_id, resource_id, model_id, hour) cost rollups in
// rated_usage, priced from a YAML PRICE FILE (E1) — not a DB price table.
//
// THE PRICE FILE IS THE CONTRACT (E1). An operator authors a versioned YAML file
Expand Down
9 changes: 7 additions & 2 deletions internal/rating/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ import (
// decoupled from metering.Event so the pure money math has no dependency on the
// capture/emit side and can be tested in isolation.
type RatedEvent struct {
AuthID string
ModelID string
AuthID string
// ResourceID is the deployment id (E2 customer attribution — billing resolves the
// org via resource_id→org_id). Part of the rollup grain. Empty → unattributable
// (the row can't name its deployment/org), mirroring the SQL's resource_id-IS-NULL
// handling: counted, never billed.
ResourceID string
ModelID string
// BaseModel is the HF base id a fine-tune derives from (E3), carried on the event
// from billing_event.base_model. Empty for a base model. The oracle prices an ft:
// ModelID via base x premium keyed on BaseModel — mirroring the SQL.
Expand Down
14 changes: 8 additions & 6 deletions internal/rating/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ type Result struct {
// casts these as ::bigint to avoid a silent 32-bit overflow (see store.go).
EventsRated int64
UnpricedEvents int64 // events whose model had NO resolvable price (NOT $0-billed)
UnattributableEvents int64 // in-window rows with NULL auth_id/model_id (upstream leak)
UnattributableEvents int64 // in-window rows with NULL auth_id/resource_id/model_id (upstream leak)
AmbiguousBaseEvents int64 // events under an ft: rollup spanning >1 base_model (E3 violation)
RollupsWritten int64 // distinct (auth_id, model_id, hour) rows upserted
RollupsWritten int64 // distinct (auth_id, resource_id, model_id, hour) rows upserted
ReconciledDeletions int64 // stale in-window rollups DELETED because this re-run no longer produces them
TotalCost string // sum of all rollup costs, NUMERIC as text
}
Expand All @@ -55,7 +55,9 @@ type Result struct {
func (r Result) HasUnpriced() bool { return r.UnpricedEvents > 0 }

// HasUnattributable reports whether any in-window row was skipped for a NULL
// auth_id/model_id — like HasUnpriced, a loud, exit-nonzero outcome.
// auth_id/resource_id/model_id — like HasUnpriced, a loud, exit-nonzero outcome. A
// NULL resource_id means the row can't name its deployment/org (E2), so it can't be
// billed and is counted here rather than attributed to a NULL org.
func (r Result) HasUnattributable() bool { return r.UnattributableEvents > 0 }

// HasAmbiguousBase reports whether any ft: rollup spanned more than one base_model in a
Expand All @@ -79,7 +81,7 @@ func (r Result) HasAnomaly() bool {
//
// FAIL-LOUD ON MISSING PRICE / UNATTRIBUTABLE (the fail-closed rule): an event
// whose model has no resolvable price at its time, and a row with a NULL
// auth_id/model_id, are NOT summed into any rollup — the SQL excludes them. They
// auth_id/resource_id/model_id, are NOT summed into any rollup — the SQL excludes them. They
// are COUNTED by the SAME statement that writes the rollups (one snapshot, so a
// row the drainer commits mid-run can never be excluded-but-uncounted), logged
// loudly (ERROR), and drive cmd/rater's exit-nonzero path. They never become $0
Expand Down Expand Up @@ -131,7 +133,7 @@ func (r *Rater) Run(ctx context.Context, windowStart, windowEnd time.Time, windo
windowStart.Format(time.RFC3339), windowEnd.Format(time.RFC3339), res.AmbiguousBaseEvents)
}
if res.HasUnattributable() {
r.log.Error.Printf("rating: window [%s,%s) has %d UNATTRIBUTABLE billing_event rows (NULL auth_id/model_id) — these cannot be rated; the interceptor's billing gate should reject them before metering, so a nonzero count means revenue is leaking upstream",
r.log.Error.Printf("rating: window [%s,%s) has %d UNATTRIBUTABLE billing_event rows (NULL auth_id/resource_id/model_id — a NULL resource_id can't name the deployment/org for E2 billing) — these cannot be rated; the interceptor's billing gate should reject them before metering, so a nonzero count means revenue is leaking upstream",
windowStart.Format(time.RFC3339), windowEnd.Format(time.RFC3339), res.UnattributableEvents)
}
if res.HasUnpriced() {
Expand Down Expand Up @@ -162,7 +164,7 @@ func (r *Rater) Run(ctx context.Context, windowStart, windowEnd time.Time, windo
}

if res.HasAnomaly() {
r.log.Error.Printf("rating: window [%s,%s) rated %d events into %d rollups, total=%s USD; %d UNPRICED events dropped (backfill prices and re-rate), %d UNATTRIBUTABLE rows skipped (NULL auth_id/model_id — upstream billing-gate leak), %d AMBIGUOUS-BASE events dropped (ft: id spanning multiple base_models — fix base_model propagation and re-rate)",
r.log.Error.Printf("rating: window [%s,%s) rated %d events into %d rollups, total=%s USD; %d UNPRICED events dropped (backfill prices and re-rate), %d UNATTRIBUTABLE rows skipped (NULL auth_id/resource_id/model_id — upstream billing-gate leak), %d AMBIGUOUS-BASE events dropped (ft: id spanning multiple base_models — fix base_model propagation and re-rate)",
windowStart.Format(time.RFC3339), windowEnd.Format(time.RFC3339),
res.EventsRated, res.RollupsWritten, res.TotalCost, res.UnpricedEvents, res.UnattributableEvents, res.AmbiguousBaseEvents)
} else {
Expand Down
Loading
Loading