Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified bin/opik-logger-darwin-amd64
Binary file not shown.
Binary file modified bin/opik-logger-darwin-arm64
Binary file not shown.
Binary file modified bin/opik-logger-linux-amd64
Binary file not shown.
Binary file modified bin/opik-logger-windows-amd64.exe
Binary file not shown.
161 changes: 76 additions & 85 deletions src/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,21 @@ func computeBillingSnapshot(fullEntries, turnEntries []TranscriptEntry) map[stri
staticPieces := staticPrefixPieces(fullEntries)
skillBodyNames := skillBodyNameBySHA(fullEntries)
toolNames := toolUseNames(fullEntries)
counts := countNewEvents(turnEntries, skillBodyNames, toolNames)

// Counts: one event per NEW conversation piece this turn, derived from
// the SAME layout (keys, sizing, bucketing) billing re-bills every turn.
// A separate scanner here once bucketed prompts by estimate while billing
// bucketed by measured tokens, so counts landed in one bucket and tokens
// in another. Assistant pieces are excluded: their input-lane keys would
// double-count tool calls already counted via tool_result, and their
// events are counted on the output side by attributeOutput under the
// output item keys.
counts := map[billingKey]int{}
for _, p := range conversationPieces(turnEntries, skillBodyNames, toolNames) {
if p.key.kind == kindUsage && !p.exact {
counts[p.key]++
}
}

acc := map[billingKey]*billingTier{}
totals := billingTier{}
Expand All @@ -83,7 +97,7 @@ func computeBillingSnapshot(fullEntries, turnEntries []TranscriptEntry) map[stri
pieces = reconcileToUsage(pieces, float64(call.read+call.write+call.fresh))
cutByPosition(pieces, float64(call.read), float64(call.write), acc)

attributeOutput(fullEntries[call.entryIdx:call.entryEnd], acc)
attributeOutput(fullEntries[call.entryIdx:call.entryEnd], acc, counts)

totals.cacheRead += float64(call.read)
totals.cacheCreation += float64(call.write)
Expand Down Expand Up @@ -399,13 +413,13 @@ func toolLane(name string) (string, string) {
}
}

// reconcileToUsage makes Σ pieces == total exactly. Overshoot shrinks the
// estimated pieces first (usage-derived ones are normally already exact);
// if the usage-derived pieces alone still exceed the measured total — the
// request dropped content we can't see (compaction we failed to detect,
// context editing) — they are scaled down too: the per-call exactness
// contract outranks per-piece exactness. Undershoot appends the explicit
// `unattributed` tail piece.
// reconcileToUsage makes Σ pieces == total. Overshoot shrinks only the
// estimated pieces (usage-derived ones are exact); undershoot appends the
// explicit `unattributed` tail piece. Usage-derived pieces are NEVER
// scaled, deliberately: if they alone exceed the measured prompt, the
// request dropped content we failed to detect (a truncation mechanism we
// don't parse yet), and the resulting Σ lanes > usage discrepancy is the
// signal that finds that bug — smearing it away would hide it.
func reconcileToUsage(pieces []billingPiece, total float64) []billingPiece {
sum, estSum := 0.0, 0.0
for _, p := range pieces {
Expand All @@ -414,27 +428,16 @@ func reconcileToUsage(pieces []billingPiece, total float64) []billingPiece {
estSum += p.tokens
}
}
exactSum := sum - estSum
switch {
case sum > total:
if estSum > 0 {
target := total - exactSum
if target < 0 {
target = 0
}
scale := target / estSum
for i := range pieces {
if !pieces[i].exact {
pieces[i].tokens *= scale
}
}
case sum > total && estSum > 0:
target := total - (sum - estSum)
if target < 0 {
target = 0
}
if exactSum > total {
scale := total / exactSum
for i := range pieces {
if pieces[i].exact {
pieces[i].tokens *= scale
}
scale := target / estSum
for i := range pieces {
if !pieces[i].exact {
pieces[i].tokens *= scale
}
}
case sum < total:
Expand Down Expand Up @@ -466,8 +469,12 @@ func cutByPosition(pieces []billingPiece, read, write float64, acc map[billingKe

// attributeOutput books the call's own blocks against output. callEntries is
// the contiguous span of the call's entries, so per-block attributed shares
// sum to the call's usage.output_tokens by construction.
func attributeOutput(callEntries []TranscriptEntry, acc map[billingKey]*billingTier) {
// sum to the call's usage.output_tokens by construction. Each block also
// bumps counts under the same key, so output items carry true event counts
// (blocks emitted, tool calls made) keyed identically to their tokens.
func attributeOutput(callEntries []TranscriptEntry, acc map[billingKey]*billingTier,
counts map[billingKey]int) {

parsed := ParseAssistantMessages(callEntries)
DeduplicateUsage(parsed)
for _, p := range parsed {
Expand All @@ -484,63 +491,10 @@ func attributeOutput(callEntries []TranscriptEntry, acc map[billingKey]*billingT
continue
}
tierFor(acc, key).output += float64(p.AttributedOutputTokens)
counts[key]++
}
}

// countNewEvents returns the number of NEW events this turn per usage key:
// prompts per bucket, tool calls per tool/server, files per ext, skill loads
// per skill. Additive across traces (each event counted once, in its turn),
// so plain SUM yields true counts — the same split rule as everywhere else.
func countNewEvents(turnEntries []TranscriptEntry, skillBodyNames map[string]string,
toolNames map[string]string) map[billingKey]int {

counts := map[billingKey]int{}
bump := func(lane, entity string) {
counts[billingKey{lane, entity, kindUsage}]++
}

for _, e := range turnEntries {
switch e.Type {
case "user":
if e.Message == nil || e.IsCompactSummary {
continue
}
for _, c := range e.Message.Content {
switch c.Type {
case "text":
if _, ok := skillBodyNames[sha256hex(c.Text)]; ok {
continue // loads counted via buildLoadedSkillBodies below
}
bump("user_prompts", promptBucket(tokEstimateAs(c.Text, "user_prompt")))
case "tool_result":
lane, entity := toolLane(toolNames[c.ToolUseID])
bump(lane, entity)
}
}
case "attachment":
if e.Attachment == nil || e.Attachment.Type != "file" {
continue
}
var w struct {
File struct {
Path string `json:"path,omitempty"`
} `json:"file"`
}
if json.Unmarshal(e.Attachment.Content, &w) == nil {
ext := strings.ToLower(filepath.Ext(w.File.Path))
if ext == "" {
ext = "other"
}
bump("file_attachments", ext)
}
}
}
for _, l := range buildLoadedSkillBodies(turnEntries) {
bump("skills", l.Name)
}
return counts
}

func tierFor(acc map[billingKey]*billingTier, key billingKey) *billingTier {
t, ok := acc[key]
if !ok {
Expand Down Expand Up @@ -636,7 +590,12 @@ func renderBillingSnapshot(callCount int, totals billingTier,
}

lanes := map[string]interface{}{}
laneSum := billingTier{}
for lane, t := range laneTiers {
laneSum.cacheRead += t.cacheRead
laneSum.cacheCreation += t.cacheCreation
laneSum.fresh += t.fresh
laneSum.output += t.output
obj := tierFields(t)
if items := laneItems[lane]; len(items) > 0 {
sort.Slice(items, func(i, j int) bool {
Expand All @@ -660,10 +619,42 @@ func renderBillingSnapshot(callCount int, totals billingTier,
"input": round(totals.fresh),
"output": round(totals.output),
},
"lanes": lanes,
"lanes": lanes,
"reconciliation": reconciliation(laneSum, totals),
}
}

// reconciliation reports Σ lanes minus usage per tier column. Healthy
// attribution reconciles to ~0 (the unattributed lane absorbs undershoot);
// a non-zero delta means the layout disagrees with what the API billed —
// usually a truncation mechanism we don't detect yet. It is emitted on
// every trace so inconsistencies are monitorable instead of latent.
func reconciliation(laneSum, totals billingTier) map[string]interface{} {
delta := func(got, want float64) int {
d := got - want
if d < 0 {
return -round(-d)
}
return round(d)
}
deltas := map[string]int{
"input_delta": delta(laneSum.fresh, totals.fresh),
"cache_read_delta": delta(laneSum.cacheRead, totals.cacheRead),
"cache_creation_delta": delta(laneSum.cacheCreation, totals.cacheCreation),
"output_delta": delta(laneSum.output, totals.output),
}
consistent := true
out := map[string]interface{}{}
for k, v := range deltas {
out[k] = v
if v != 0 {
consistent = false
}
}
out["consistent"] = consistent
return out
}

func round(f float64) int { return int(f + 0.5) }

func minF(a, b float64) float64 {
Expand Down
56 changes: 27 additions & 29 deletions src/billing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,41 +332,39 @@ func TestBillingCompactBoundaryTruncatesReplay(t *testing.T) {
}
}

// Safety net: when usage-derived replay pieces alone exceed the call's
// measured prompt (undetected truncation), they must be scaled down so the
// per-call exactness contract still holds — the overshoot must never land
// in the fresh-input tier.
func TestBillingExactOvershootIsClamped(t *testing.T) {
entries := []TranscriptEntry{userPromptEntry("hi")}
entries = append(entries, assistantCall(t, "m1",
&Usage{InputTokens: 20, OutputTokens: 50_000},
Content{Type: "thinking", Thinking: "redacted"},
Content{Type: "text", Text: "done"},
)...)
u2 := &Usage{InputTokens: 40, CacheReadInputTokens: 1_000, OutputTokens: 10}
entries = append(entries, assistantCall(t, "m2", u2, Content{Type: "text", Text: "ok"})...)
// cc.billing.reconciliation reports Σ lanes minus usage per tier column —
// the monitorable health flag. Healthy attribution reconciles to zero;
// when usage-derived pieces exceed the billed prompt (a truncation we
// don't detect), consistent flips false and the input delta is positive.
func TestBillingReconciliationFlag(t *testing.T) {
u1 := &Usage{InputTokens: 900, CacheCreationInputTokens: 30_000, OutputTokens: 250}
entries := []TranscriptEntry{userPromptEntry("please do the thing")}
entries = append(entries, assistantCall(t, "m1", u1,
Content{Type: "text", Text: strings.Repeat("plan ", 40)})...)

snap := computeBillingSnapshot(entries, entries)
if snap == nil {
t.Fatal("expected billing snapshot")
recon := snap["reconciliation"].(map[string]interface{})
if !recon["consistent"].(bool) {
t.Errorf("healthy turn must reconcile, got %v", recon)
}

wantRead := u2.CacheReadInputTokens
wantFresh := 20 + u2.InputTokens
wantOut := 50_000 + u2.OutputTokens
// Now an undetected truncation: a 50k-output call (thinking carries the
// usage-derived mass) replayed against a tiny billed prompt.
big := &Usage{InputTokens: 900, CacheCreationInputTokens: 30_000, OutputTokens: 50_000}
entries = []TranscriptEntry{userPromptEntry("please do the thing")}
entries = append(entries, assistantCall(t, "m1", big,
Content{Type: "thinking", Thinking: "redacted"},
Content{Type: "text", Text: "done"})...)
u2 := &Usage{InputTokens: 40, CacheReadInputTokens: 1_000, OutputTokens: 10}
entries = append(entries, assistantCall(t, "m2", u2, Content{Type: "text", Text: "ok"})...)

read, write, fresh, output, rows := billingColumnSums(snap)
closeEnough := func(got, want int) bool {
d := got - want
if d < 0 {
d = -d
}
return d <= rows
snap = computeBillingSnapshot(entries, entries)
recon = snap["reconciliation"].(map[string]interface{})
if recon["consistent"].(bool) {
t.Fatalf("exact overshoot must flip consistent=false, got %v", recon)
}
if !closeEnough(read, wantRead) || !closeEnough(write, 0) ||
!closeEnough(fresh, wantFresh) || !closeEnough(output, wantOut) {
t.Errorf("Σ lanes = read %d / write %d / fresh %d / output %d, want %d/0/%d/%d (±%d)",
read, write, fresh, output, wantRead, wantFresh, wantOut, rows)
if recon["input_delta"].(int) <= 0 {
t.Errorf("overshoot must surface as positive input_delta, got %v", recon)
}
}

Expand Down
40 changes: 40 additions & 0 deletions src/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"fmt"
"os"
"os/exec"
"strconv"
Expand Down Expand Up @@ -236,6 +237,7 @@ func postTraceMetrics(state *State) {
}

mergeMetadataCC(state.TraceID, metrics)
postReconciliationScore(state.TraceID, metrics["billing"])

var files, authored, overwritten int
if agg != nil {
Expand Down Expand Up @@ -293,3 +295,41 @@ func inferCwd() string {
}
return ""
}

// postReconciliationScore mirrors cc.billing.reconciliation as a trace
// feedback score so inconsistent traces are filterable in the UI:
// token_count_consistent = 1 when Σ lanes == API usage on every tier
// column, 0 otherwise (the per-column deltas land in the reason).
func postReconciliationScore(traceID string, billing interface{}) {
snap, ok := billing.(map[string]interface{})
if !ok {
return
}
recon, ok := snap["reconciliation"].(map[string]interface{})
if !ok {
return
}

value := 0.0
if consistent, _ := recon["consistent"].(bool); consistent {
value = 1.0
}
score := map[string]interface{}{
"id": traceID,
"name": "token_count_consistent",
"value": value,
"source": "sdk",
}
if value == 0 {
score["reason"] = fmt.Sprintf(
"Σ lanes minus API usage: input %+d, cache_read %+d, cache_creation %+d, output %+d",
recon["input_delta"], recon["cache_read_delta"],
recon["cache_creation_delta"], recon["output_delta"])
}

if err := api.Put("/traces/feedback-scores", map[string]interface{}{
"scores": []interface{}{score},
}); err != nil {
debugLog("post reconciliation score: %v", err)
}
}
Loading