Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 85 additions & 20 deletions cmd/rater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,47 @@
// (the E3 ft-uniqueness violation: a uuid4 checkpoint id cannot carry two bases,
// so this is broken base_model propagation, never a priceable rollup). Any of
// these means revenue is leaking upstream. Distinct code so a CronJob can alert
// on "lost revenue / lost data" separately from "job broke".
// on "lost revenue / lost data" separately from "job broke". A ROUTINE run (the
// default trailing-hours window, no --since/--until) that RECONCILE-DELETED a
// previously-billed rated_usage row ALSO exits 2: on a routine cadence a prior
// bill vanishing means data was lost / an upstream regression dropped events —
// page someone (see exitCode and the reconcile-exit contract below).
//
// Re-running any window is safe and idempotent (rollups upsert on the natural
// key), so cron overlap or manual re-rating never double-counts.
//
// THE RECONCILE-DELETE EXIT CONTRACT (option (c): loud on routine, quiet on
// backfill). A run that DELETES a previously-billed rated_usage row
// (ReconciledDeletions > 0) is a REVENUE CHANGE — a customer's prior bill just got
// rewritten. Whether that is alarming depends on WHY the window was chosen:
// - DEFAULT trailing-hours window (no --since/--until): a routine cron run should
// NOT be rewriting prior bills. If it does, events that billed before have
// vanished from billing_event (data loss) or an upstream regression now drops
// them (e.g. base_model stopped propagating → rollups went unpriced/ambiguous).
// That is an ERROR + exit 2 — treat it like the other anomalies and page.
// - EXPLICIT --since/--until window (operator backfill, e.g. after a late price
// fix): convergence is exactly what the operator asked for — INFO + exit 0.
//
// The gate is ReconciledDeletions > 0 && !windowExplicit. The semantics are
// UNCHANGED either way ("what the latest run says is what bills" — store.go always
// reconciles); this is purely the observability/exit contract. The decision lives
// HERE, in cmd/rater, because only here is it known whether the window was explicit;
// it is deliberately NOT folded into Result.HasAnomaly() (that would make explicit
// backfills exit nonzero too).
//
// SINGLE-FLIGHT ASSUMPTION (deployment contract, enforced OUTSIDE this repo). The
// rater MUST run single-flight: no two rater processes rating overlapping windows
// concurrently. The reconcile DELETE (the `deleted` CTE in store.go) and the
// ordered upsert are deadlock-safe ONLY under single-flight — the upsert's
// ORDER BY prevents a rater self-deadlocking against its own rows, but two
// concurrent raters could still take row locks in opposing orders across the
// DELETE/INSERT pair (a classic ABBA hazard). The deployment MUST forbid
// concurrency: the Atlas CronJob sets `concurrencyPolicy: Forbid` so a slow run is
// never overlapped by the next tick. That manifest lives in the Atlas repo, not
// here; this rater does NOT add delete-lock-ordering machinery because single-flight
// makes the hazard unreachable. If you ever run the rater outside that CronJob,
// preserve single-flight some other way (an advisory lock, a queue) before doing so.
//
// THE DEFAULT WINDOW IS THE TRAILING N COMPLETE HOURS, [floor(now)-N*1h,
// floor(now)), N = rateTrailingHours (default 24). It is deliberately NOT just the
// last hour: events can be DRAINED LATE (a Valkey outage recovered from the WAL)
Expand Down Expand Up @@ -92,6 +128,26 @@ const (
exitAnomaly = 2 // window rated but something leaked: unpriced and/or unattributable
)

// exitCode maps a completed rating run to its process exit code, encoding the
// reconcile-delete contract (see the package doc). A run that rated cleanly but
// leaked an anomaly (unpriced / unattributable / ambiguous-base) ALWAYS exits
// exitAnomaly. A reconcile-delete (reconciledDeletions > 0) exits exitAnomaly TOO —
// but ONLY when the window was the default trailing-hours window (windowExplicit ==
// false): on a routine run a prior bill vanishing is alarming (data loss / upstream
// regression). When the operator chose the window explicitly (--since/--until),
// reconcile-deletes are intended convergence (a backfill) and do NOT raise the exit
// code on their own. This is the single place the routine-vs-backfill decision is
// made, because only cmd/rater knows whether the window was explicit.
func exitCode(reconciledDeletions int64, windowExplicit, hasAnomaly bool) int {
if hasAnomaly {
return exitAnomaly
}
if reconciledDeletions > 0 && !windowExplicit {
return exitAnomaly
}
return exitOK
}

func main() {
os.Exit(run())
}
Expand Down Expand Up @@ -133,7 +189,7 @@ func run() int {
return exitFatal
}

windowStart, windowEnd, err := resolveWindow(*since, *until, opts.trailingHours, time.Now())
windowStart, windowEnd, windowExplicit, err := resolveWindow(*since, *until, opts.trailingHours, time.Now())
if err != nil {
log.Error.Printf("rater: %v", err)
return exitFatal
Expand All @@ -152,21 +208,18 @@ func run() int {
defer func() { _ = store.Close() }()

r := rating.New(store, book, log)
res, err := r.Run(ctx, windowStart, windowEnd)
res, err := r.Run(ctx, windowStart, windowEnd, windowExplicit)
if err != nil {
log.Error.Printf("rater: run: %v", err)
return exitFatal
}

if res.HasAnomaly() {
// The window was rated and rollups were written, but something leaked:
// events that could not be priced, rows that could not be attributed (NULL
// auth_id/model), and/or an ft: rollup spanning multiple base_models (the E3
// ft-uniqueness violation). Non-zero exit so a CronJob surfaces the lost
// revenue / lost data.
return exitAnomaly
}
return exitOK
// Exit code encodes BOTH the leaked-anomaly signal (unpriced / unattributable /
// ambiguous-base — always nonzero) AND the reconcile-delete contract: a routine
// run (default window) that rewrote a prior bill is alarming and exits nonzero,
// while an explicit backfill (--since/--until) that did so is intended and exits
// 0. See exitCode and the package doc.
return exitCode(res.ReconciledDeletions, windowExplicit, res.HasAnomaly())
}

// resolveWindow computes [start, end). Defaults (both flags empty) rate the
Expand All @@ -180,32 +233,44 @@ func run() int {
// late still slips (residual risk; see the package doc — reconciliation is the
// backstop). Either flag may be given explicitly (RFC3339) and WINS over the
// default; both must parse, be hour-aligned, and start must be before end.
func resolveWindow(since, until string, trailingHours int, now time.Time) (time.Time, time.Time, error) {
//
// The third return is windowExplicit: TRUE if EITHER --since or --until was set, i.e.
// the operator named the window rather than taking the trailing-hours default. It is
// the single source of truth for the routine-vs-backfill distinction the
// reconcile-delete exit contract turns on (see exitCode + the package doc): a
// reconcile-delete on a routine (default) window is alarming; on an explicit backfill
// it is intended convergence.
func resolveWindow(since, until string, trailingHours int, now time.Time) (time.Time, time.Time, bool, error) {
if trailingHours < 1 {
return time.Time{}, time.Time{}, errInvalidTrailingHours(trailingHours)
return time.Time{}, time.Time{}, false, errInvalidTrailingHours(trailingHours)
}
now = now.UTC()
currentHour := now.Truncate(time.Hour)

start := currentHour.Add(-time.Duration(trailingHours) * time.Hour)
end := currentHour

// Either flag set means the operator chose the window — an explicit backfill, not
// the routine trailing-hours cadence. Captured BEFORE parsing so it reflects
// operator INTENT (a flag was passed) independent of the parsed values.
windowExplicit := since != "" || until != ""

if since != "" {
t, err := time.Parse(time.RFC3339, since)
if err != nil {
return time.Time{}, time.Time{}, errBadWindow("since", since, err)
return time.Time{}, time.Time{}, false, errBadWindow("since", since, err)
}
start = t.UTC()
}
if until != "" {
t, err := time.Parse(time.RFC3339, until)
if err != nil {
return time.Time{}, time.Time{}, errBadWindow("until", until, err)
return time.Time{}, time.Time{}, false, errBadWindow("until", until, err)
}
end = t.UTC()
}
if !start.Before(end) {
return time.Time{}, time.Time{}, errInvertedWindow(start, end)
return time.Time{}, time.Time{}, false, errInvertedWindow(start, end)
}
// Both bounds MUST be hour-aligned. The rollup buckets by date_trunc('hour')
// and the upsert REPLACES a bucket's totals (not additive — that is what makes
Expand All @@ -215,12 +280,12 @@ func resolveWindow(since, until string, trailingHours int, now time.Time) (time.
// this fences the explicit --since/--until path. Fail loud rather than snap,
// so an operator never silently re-rates hours they did not name.
if !start.Truncate(time.Hour).Equal(start) {
return time.Time{}, time.Time{}, errUnalignedWindow("since", start)
return time.Time{}, time.Time{}, false, errUnalignedWindow("since", start)
}
if !end.Truncate(time.Hour).Equal(end) {
return time.Time{}, time.Time{}, errUnalignedWindow("until", end)
return time.Time{}, time.Time{}, false, errUnalignedWindow("until", end)
}
return start, end, nil
return start, end, windowExplicit, nil
}

// raterOptions are the non-pool knobs loadConfig resolves from the settings file.
Expand Down
96 changes: 88 additions & 8 deletions cmd/rater/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func mustTime(s string) time.Time {
// so the re-rate never doubles).
func TestResolveWindow_DefaultTrailingHours(t *testing.T) {
now := mustTime("2026-06-08T10:37:42Z")
start, end, err := resolveWindow("", "", defaultRateTrailingHours, now)
start, end, explicit, err := resolveWindow("", "", defaultRateTrailingHours, now)
if err != nil {
t.Fatal(err)
}
Expand All @@ -30,22 +30,30 @@ func TestResolveWindow_DefaultTrailingHours(t *testing.T) {
if !end.Equal(mustTime("2026-06-08T10:00:00Z")) {
t.Fatalf("end = %v, want 10:00", end)
}
// The default trailing window is NOT explicit — it is the routine cadence, so a
// reconcile-delete on it must page (see the reconcile-exit contract).
if explicit {
t.Fatal("default trailing window must report windowExplicit=false")
}

// A custom N widens/narrows the trailing window; N=1 is the old last-hour-only.
start, end, err = resolveWindow("", "", 3, now)
start, end, explicit, err = resolveWindow("", "", 3, now)
if err != nil {
t.Fatal(err)
}
if !start.Equal(mustTime("2026-06-08T07:00:00Z")) || !end.Equal(mustTime("2026-06-08T10:00:00Z")) {
t.Fatalf("window = [%v,%v), want [07:00,10:00) for N=3", start, end)
}
if explicit {
t.Fatal("a custom N is still the trailing default, not an explicit window")
}
}

// TestResolveWindow_RejectsBadTrailingHours: N < 1 would rate nothing — fail loud.
func TestResolveWindow_RejectsBadTrailingHours(t *testing.T) {
now := mustTime("2026-06-08T10:37:42Z")
for _, n := range []int{0, -1} {
if _, _, err := resolveWindow("", "", n, now); err == nil {
if _, _, _, err := resolveWindow("", "", n, now); err == nil {
t.Fatalf("expected error for trailingHours=%d", n)
}
}
Expand All @@ -55,31 +63,103 @@ func TestResolveWindow_RejectsBadTrailingHours(t *testing.T) {
// trailing default).
func TestResolveWindow_ExplicitFlags(t *testing.T) {
now := mustTime("2026-06-08T10:37:42Z")
start, end, err := resolveWindow("2026-06-01T00:00:00Z", "2026-06-02T00:00:00Z", defaultRateTrailingHours, now)
start, end, explicit, err := resolveWindow("2026-06-01T00:00:00Z", "2026-06-02T00:00:00Z", defaultRateTrailingHours, now)
if err != nil {
t.Fatal(err)
}
if !start.Equal(mustTime("2026-06-01T00:00:00Z")) || !end.Equal(mustTime("2026-06-02T00:00:00Z")) {
t.Fatalf("window = [%v,%v), want the 24h day", start, end)
}
// Both flags set → an explicit operator backfill, so a reconcile-delete here is
// intended convergence (exit 0), not a page.
if !explicit {
t.Fatal("an explicit --since/--until window must report windowExplicit=true")
}
}

// TestResolveWindow_SingleFlagIsExplicit: setting EITHER flag alone (not both) still
// marks the window explicit — the operator named a bound, so a reconcile-delete on it
// is an intended backfill, not a routine-run page.
func TestResolveWindow_SingleFlagIsExplicit(t *testing.T) {
now := mustTime("2026-06-08T10:37:42Z")
// --since only (until defaults to floor(now), hour-aligned).
if _, _, explicit, err := resolveWindow("2026-06-01T00:00:00Z", "", defaultRateTrailingHours, now); err != nil || !explicit {
t.Fatalf("--since-only: explicit=%v err=%v, want explicit=true nil", explicit, err)
}
// --until only (since defaults to floor(now)-N*1h, hour-aligned).
if _, _, explicit, err := resolveWindow("", "2026-06-08T09:00:00Z", defaultRateTrailingHours, now); err != nil || !explicit {
t.Fatalf("--until-only: explicit=%v err=%v, want explicit=true nil", explicit, err)
}
}

// TestResolveWindow_Inverted rejects start >= end.
func TestResolveWindow_Inverted(t *testing.T) {
now := mustTime("2026-06-08T10:37:42Z")
if _, _, err := resolveWindow("2026-06-02T00:00:00Z", "2026-06-01T00:00:00Z", defaultRateTrailingHours, now); err == nil {
if _, _, _, err := resolveWindow("2026-06-02T00:00:00Z", "2026-06-01T00:00:00Z", defaultRateTrailingHours, now); err == nil {
t.Fatal("expected error for inverted window")
}
}

// TestResolveWindow_BadFormat rejects a non-RFC3339 value.
func TestResolveWindow_BadFormat(t *testing.T) {
now := mustTime("2026-06-08T10:37:42Z")
if _, _, err := resolveWindow("yesterday", "", defaultRateTrailingHours, now); err == nil {
if _, _, _, err := resolveWindow("yesterday", "", defaultRateTrailingHours, now); err == nil {
t.Fatal("expected error for unparseable --since")
}
}

// TestRater_RoutineRunReconcileDeleteExitsNonzero pins the EXIT-CODE half of the
// reconcile observability contract: a ROUTINE run (the default trailing-hours window —
// windowExplicit == false) that DELETED a previously-billed rated_usage row
// (ReconciledDeletions > 0) is a revenue change with no operator behind it, which
// means data was lost / an upstream regression dropped events. It MUST exit nonzero
// (exitAnomaly) so a CronJob pages — even with NO other anomaly. This pins ONLY the
// exit code (exitCode()'s gate); the matching LOG-severity half (routine → ERROR) is
// pinned separately by TestRater_RoutineReconcileDeleteLogsError in internal/rating,
// where the reconcile-delete log line is emitted. RED before FIX 1's exit-code change:
// the old exit path keyed solely on HasAnomaly(), so a reconcile-delete with no other
// anomaly returned exitOK and the page never fired.
func TestRater_RoutineRunReconcileDeleteExitsNonzero(t *testing.T) {
const (
windowExplicit = false // routine default trailing-hours window
hasAnomaly = false // ONLY a reconcile-delete; no unpriced/unattributable/ambiguous
)
if got := exitCode(1, windowExplicit, hasAnomaly); got != exitAnomaly {
t.Fatalf("routine run with a reconcile-delete: exit = %d, want exitAnomaly (%d) — a prior bill vanished on a routine cadence; page someone", got, exitAnomaly)
}
// And with MORE than one delete (count is just a signal, not a threshold).
if got := exitCode(7, windowExplicit, hasAnomaly); got != exitAnomaly {
t.Fatalf("routine run with 7 reconcile-deletes: exit = %d, want exitAnomaly (%d)", got, exitAnomaly)
}
// Sanity: a routine run with NO reconcile-delete and no anomaly is the clean path.
if got := exitCode(0, windowExplicit, hasAnomaly); got != exitOK {
t.Fatalf("clean routine run: exit = %d, want exitOK (%d)", got, exitOK)
}
}

// TestRater_BackfillReconcileDeleteExitsZero pins the quiet half: an EXPLICIT
// operator backfill (--since/--until → windowExplicit == true) that DELETED a
// previously-billed row is intended convergence ("they asked for it", e.g. after a
// late price fix), so it exits 0 (INFO, not a page) when nothing else leaked. A
// genuine anomaly still forces exitAnomaly even on a backfill — the explicit flag
// suppresses ONLY the reconcile-delete signal, never a real anomaly.
func TestRater_BackfillReconcileDeleteExitsZero(t *testing.T) {
const windowExplicit = true // operator named the window
// Reconcile-delete on an explicit backfill, nothing else leaked → exit 0.
if got := exitCode(3, windowExplicit, false); got != exitOK {
t.Fatalf("explicit backfill with a reconcile-delete: exit = %d, want exitOK (%d) — convergence the operator asked for", got, exitOK)
}
// But a real anomaly during a backfill STILL exits nonzero (the flag never
// suppresses unpriced/unattributable/ambiguous).
if got := exitCode(3, windowExplicit, true); got != exitAnomaly {
t.Fatalf("explicit backfill that ALSO leaked an anomaly: exit = %d, want exitAnomaly (%d) — --since must not mask a real anomaly", got, exitAnomaly)
}
// A clean explicit backfill (no deletes, no anomaly) is exit 0.
if got := exitCode(0, windowExplicit, false); got != exitOK {
t.Fatalf("clean explicit backfill: exit = %d, want exitOK (%d)", got, exitOK)
}
}

// TestResolveWindow_RejectsUnaligned guards the under-bill footgun: a sub-hour
// window would overwrite a complete hourly rollup with a partial sum, so a
// non-hour-aligned --since/--until must fail loud rather than rate silently.
Expand All @@ -95,13 +175,13 @@ func TestResolveWindow_RejectsUnaligned(t *testing.T) {
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if _, _, err := resolveWindow(tc.since, tc.until, defaultRateTrailingHours, now); err == nil {
if _, _, _, err := resolveWindow(tc.since, tc.until, defaultRateTrailingHours, now); err == nil {
t.Fatalf("expected error for non-hour-aligned window %s..%s", tc.since, tc.until)
}
})
}
// A fully hour-aligned explicit window is still accepted.
if _, _, err := resolveWindow("2026-06-01T00:00:00Z", "2026-06-01T03:00:00Z", defaultRateTrailingHours, now); err != nil {
if _, _, _, err := resolveWindow("2026-06-01T00:00:00Z", "2026-06-01T03:00:00Z", defaultRateTrailingHours, now); err != nil {
t.Fatalf("hour-aligned window should be accepted, got %v", err)
}
}
2 changes: 1 addition & 1 deletion internal/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (h *harness) rateEventHour(t *testing.T, book *rating.PriceBook) rating.Res
hour := evTS.UTC().Truncate(time.Hour)

rater := rating.New(rating.NewPostgresStore(h.db), book, h.log)
res, err := rater.Run(context.Background(), hour, hour.Add(time.Hour))
res, err := rater.Run(context.Background(), hour, hour.Add(time.Hour), false)
if err != nil {
t.Fatalf("rater.Run: %v", err)
}
Expand Down
Loading
Loading