From 962c19cd79d52db45eeabd396d6c390a6dfc6249 Mon Sep 17 00:00:00 2001 From: Menci Date: Sat, 20 Jun 2026 00:40:30 +0800 Subject: [PATCH 1/4] feat(protocols,gateway): add input_cache_write_1h dimension and per-tier pricing override Adds two cross-provider concepts the BillingDimension-based shape did not capture: - `input_cache_write_1h` is a new disjoint billing dimension for Anthropic's extended-cache-ttl-2025-04-11 1-hour cache writes. `cache_creation` on the Messages response splits writes into `ephemeral_5m_input_tokens` (existing `input_cache_write`) and `ephemeral_1h_input_tokens` (this new dimension). `unitPriceForDimension` falls back 1h -> 5m -> input. - `ModelPricing.tiers` carries per-service-tier overrides (Anthropic `usage.speed`, OpenAI `usage.service_tier`). `resolveEffectivePricing` folds a tier override into a flat snapshot before any unit-price lookup. `UsageRecord` and the SQL usage tables grow a `tier` column that is part of bucket identity, so distinct tiers for the same model/hour aggregate as separate buckets with distinct unit-price snapshots. Migration 0034 recreates `usage` and `usage_requests` to add the `tier` column and to widen the dimension CHECK list to include the new bucket. Existing rows backfill with `tier = NULL`, which `resolveEffectivePricing` treats as base pricing - historical aggregations compute identically. --- .../0034_usage_per_ttl_and_tier.sql | 49 ++++++++++++++++++ packages/gateway/src/app-control_test.ts | 5 ++ .../src/control-plane/data-transfer/routes.ts | 4 ++ .../data-transfer/routes_test.ts | 2 + packages/gateway/src/control-plane/schemas.ts | 1 + .../control-plane/token-usage/aggregate.ts | 9 ++-- .../token-usage/aggregate_test.ts | 1 + .../control-plane/token-usage/routes_test.ts | 1 + .../src/data-plane/shared/telemetry/usage.ts | 5 +- packages/gateway/src/repo/memory.ts | 13 +++-- packages/gateway/src/repo/sql.ts | 50 +++++++++++-------- packages/gateway/src/repo/types.ts | 25 +++++++--- packages/protocols/src/common/models.ts | 43 ++++++++++++---- packages/provider-custom/src/fetch-models.ts | 6 +-- packages/provider/src/model-config.ts | 6 +-- 15 files changed, 165 insertions(+), 55 deletions(-) create mode 100644 packages/gateway/migrations/0034_usage_per_ttl_and_tier.sql diff --git a/packages/gateway/migrations/0034_usage_per_ttl_and_tier.sql b/packages/gateway/migrations/0034_usage_per_ttl_and_tier.sql new file mode 100644 index 000000000..b35b9eda2 --- /dev/null +++ b/packages/gateway/migrations/0034_usage_per_ttl_and_tier.sql @@ -0,0 +1,49 @@ +-- Add `tier` (Anthropic `usage.speed`, OpenAI `usage.service_tier`) to usage +-- and usage_requests, and `input_cache_write_1h` to the dimension CHECK list. +-- Existing rows backfill with `tier = NULL` so historical aggregations compute +-- identically. SQLite cannot extend a CHECK constraint or a UNIQUE INDEX in +-- place over a new column, so both tables are rebuilt. + +CREATE TABLE usage_new ( + key_id TEXT NOT NULL, + model TEXT NOT NULL, + upstream TEXT, + model_key TEXT NOT NULL, + hour TEXT NOT NULL, + tier TEXT, + dimension TEXT NOT NULL CHECK (dimension IN ( + 'input', 'input_cache_read', 'input_cache_write', 'input_cache_write_1h', 'input_image', 'output', 'output_image' + )), + tokens INTEGER NOT NULL DEFAULT 0, + unit_price REAL +); + +INSERT INTO usage_new (key_id, model, upstream, model_key, hour, tier, dimension, tokens, unit_price) + SELECT key_id, model, upstream, model_key, hour, NULL, dimension, tokens, unit_price FROM usage; + +DROP TABLE usage; +ALTER TABLE usage_new RENAME TO usage; + +CREATE UNIQUE INDEX idx_usage_dimension_identity + ON usage (key_id, model, COALESCE(upstream, ''), model_key, hour, COALESCE(tier, ''), dimension); +CREATE INDEX idx_usage_dimension_hour ON usage (hour); + +CREATE TABLE usage_requests_new ( + key_id TEXT NOT NULL, + model TEXT NOT NULL, + upstream TEXT, + model_key TEXT NOT NULL, + hour TEXT NOT NULL, + tier TEXT, + requests INTEGER NOT NULL DEFAULT 0 +); + +INSERT INTO usage_requests_new (key_id, model, upstream, model_key, hour, tier, requests) + SELECT key_id, model, upstream, model_key, hour, NULL, requests FROM usage_requests; + +DROP TABLE usage_requests; +ALTER TABLE usage_requests_new RENAME TO usage_requests; + +CREATE UNIQUE INDEX idx_usage_requests_identity + ON usage_requests (key_id, model, COALESCE(upstream, ''), model_key, hour, COALESCE(tier, '')); +CREATE INDEX idx_usage_requests_hour ON usage_requests (hour); diff --git a/packages/gateway/src/app-control_test.ts b/packages/gateway/src/app-control_test.ts index e97456c4c..dec01ede8 100644 --- a/packages/gateway/src/app-control_test.ts +++ b/packages/gateway/src/app-control_test.ts @@ -108,6 +108,7 @@ test('/api/token-usage scopes to the actor\'s keys when called with an API key', upstream: null, modelKey: 'claude-sonnet-4', hour: '2026-03-15T10', + tier: null, requests: 2, tokens: { input: 10, output: 5, input_cache_read: 4, input_cache_write: 1 }, cost: null, @@ -118,6 +119,7 @@ test('/api/token-usage scopes to the actor\'s keys when called with an API key', upstream: null, modelKey: 'gpt-5', hour: '2026-03-15T11', + tier: null, requests: 1, tokens: { input: 20, output: 8, input_cache_read: 6, input_cache_write: 2 }, cost: null, @@ -155,6 +157,7 @@ test('/api/token-usage in self-by-key mode includes per-key metadata for the act upstream: null, modelKey: 'gpt-5', hour: '2026-03-16T10', + tier: null, requests: 1, tokens: { input: 20, output: 8 }, cost: null, @@ -182,6 +185,7 @@ test('/api/token-usage all-by-user view aggregates across keys per user', async upstream: null, modelKey: 'gpt-5', hour: '2026-03-15T10', + tier: null, requests: 1, tokens: { input: 10, output: 5 }, cost: null, @@ -213,6 +217,7 @@ test('/api/token-usage merges Claude variants into backend base model records', keyId: apiKey.id, hour: '2026-03-17T10', upstream: 'copilot:1', + tier: null, requests: 1, tokens: { input: 10, output: 5, input_cache_read: 2, input_cache_write: 1 }, }; diff --git a/packages/gateway/src/control-plane/data-transfer/routes.ts b/packages/gateway/src/control-plane/data-transfer/routes.ts index 760e0ec5a..9148df49f 100644 --- a/packages/gateway/src/control-plane/data-transfer/routes.ts +++ b/packages/gateway/src/control-plane/data-transfer/routes.ts @@ -402,6 +402,9 @@ const parseUsageRecords = (value: unknown): { type: 'ok'; records: UsageRecord[] if (typeof record.upstream === 'string' && isLegacyUpstreamIdentity(record.upstream)) { return { type: 'invalid', index: i, error: 'upstream must use a raw upstream id, not a legacy provider-prefixed identity' }; } + if (record.tier !== undefined && record.tier !== null && typeof record.tier !== 'string') { + return { type: 'invalid', index: i, error: 'record has invalid tier (must be a string or null)' }; + } const tokensResult = parseImportedTokens(record.tokens); if (tokensResult.type === 'invalid') return { type: 'invalid', index: i, error: 'record has invalid token dimension counts' }; const costResult = parseImportedCost(record.cost); @@ -412,6 +415,7 @@ const parseUsageRecords = (value: unknown): { type: 'ok'; records: UsageRecord[] upstream: record.upstream as string | null, modelKey: record.modelKey, hour: record.hour, + tier: (record.tier as string | null | undefined) ?? null, requests: record.requests, tokens: tokensResult.tokens, cost: costResult.cost, diff --git a/packages/gateway/src/control-plane/data-transfer/routes_test.ts b/packages/gateway/src/control-plane/data-transfer/routes_test.ts index 70f108c03..d43d7a975 100644 --- a/packages/gateway/src/control-plane/data-transfer/routes_test.ts +++ b/packages/gateway/src/control-plane/data-transfer/routes_test.ts @@ -177,6 +177,7 @@ const USAGE_1: UsageRecord = { upstream: 'up_copilot_a', modelKey: 'claude-opus-4.7', hour: '2026-01-01T10', + tier: null, requests: 5, tokens: { input: 1000, output: 500, input_cache_read: 120, input_cache_write: 80 }, cost: null, @@ -188,6 +189,7 @@ const USAGE_2: UsageRecord = { upstream: 'up_azure_a', modelKey: 'gpt-prod', hour: '2026-01-01T11', + tier: null, requests: 3, tokens: { input: 2000, output: 800, input_cache_read: 200, input_cache_write: 50 }, cost: null, diff --git a/packages/gateway/src/control-plane/schemas.ts b/packages/gateway/src/control-plane/schemas.ts index 4b14b133e..b13dab2cd 100644 --- a/packages/gateway/src/control-plane/schemas.ts +++ b/packages/gateway/src/control-plane/schemas.ts @@ -74,6 +74,7 @@ const upstreamModelSchema = z.object({ output: z.number().optional(), input_cache_read: z.number().optional(), input_cache_write: z.number().optional(), + input_cache_write_1h: z.number().optional(), input_image: z.number().optional(), output_image: z.number().optional(), }).optional(), diff --git a/packages/gateway/src/control-plane/token-usage/aggregate.ts b/packages/gateway/src/control-plane/token-usage/aggregate.ts index 2ba67ff74..01b2d2f34 100644 --- a/packages/gateway/src/control-plane/token-usage/aggregate.ts +++ b/packages/gateway/src/control-plane/token-usage/aggregate.ts @@ -1,5 +1,5 @@ import type { UsageRecord } from '../../repo/types.ts'; -import { BILLING_DIMENSIONS, type BillingDimension, unitPriceForDimension } from '@floway-dev/protocols/common'; +import { BILLING_DIMENSIONS, type BillingDimension, resolveEffectivePricing, unitPriceForDimension } from '@floway-dev/protocols/common'; export interface DisplayUsageRecord { keyId: string; @@ -22,13 +22,16 @@ export interface DisplayUsageByUserRecord { // Cost is pure addition over the dimension rows: Σ tokens × unit_price / 1e6. // No subtraction is needed because the counts are disjoint and each dimension -// already carries its own resolved unit price snapshot. +// already carries its own resolved unit price snapshot. The bucket's tier +// folds into pricing first so per-tier overrides (Anthropic fast mode, +// OpenAI priority/flex) replace base rates before the dimension lookup. const recordCostUsd = (record: UsageRecord): number => { + const effective = resolveEffectivePricing(record.cost, record.tier); let total = 0; for (const dimension of BILLING_DIMENSIONS) { const tokens = record.tokens[dimension] ?? 0; if (tokens === 0) continue; - const unitPrice = unitPriceForDimension(record.cost, dimension); + const unitPrice = unitPriceForDimension(effective, dimension); if (unitPrice !== null) total += tokens * unitPrice; } return total / 1e6; diff --git a/packages/gateway/src/control-plane/token-usage/aggregate_test.ts b/packages/gateway/src/control-plane/token-usage/aggregate_test.ts index 04d62e46a..69128eea9 100644 --- a/packages/gateway/src/control-plane/token-usage/aggregate_test.ts +++ b/packages/gateway/src/control-plane/token-usage/aggregate_test.ts @@ -14,6 +14,7 @@ const baseRecord = (overrides: Partial): UsageRecord => ({ model: 'claude-opus-4-7', upstream: 'up_copilot', modelKey: 'claude-opus-4-7', + tier: null, requests: 1, tokens: { input: 100, output: 50 }, cost: opus47Pricing, diff --git a/packages/gateway/src/control-plane/token-usage/routes_test.ts b/packages/gateway/src/control-plane/token-usage/routes_test.ts index b911e2011..51c5ed85e 100644 --- a/packages/gateway/src/control-plane/token-usage/routes_test.ts +++ b/packages/gateway/src/control-plane/token-usage/routes_test.ts @@ -16,6 +16,7 @@ const seedUsage = async ( upstream: 'up_test', modelKey: model, hour, + tier: null, requests, tokens: { input: 100, output: 50 }, cost: null, diff --git a/packages/gateway/src/data-plane/shared/telemetry/usage.ts b/packages/gateway/src/data-plane/shared/telemetry/usage.ts index 99f232f47..9fb30ce2a 100644 --- a/packages/gateway/src/data-plane/shared/telemetry/usage.ts +++ b/packages/gateway/src/data-plane/shared/telemetry/usage.ts @@ -7,13 +7,15 @@ import type { TelemetryModelIdentity } from '@floway-dev/provider'; export const hasTokenUsage = (usage: TokenUsage): boolean => BILLING_DIMENSIONS.some(dimension => (usage[dimension] ?? 0) > 0); // Drop zero / undefined dimensions so a usage map only carries the dimensions -// actually billed. +// actually billed. `tier` (a non-numeric service-tier marker) survives the +// filter so per-tier pricing overrides resolve at recording time. export const tokenUsage = (counts: TokenUsage): TokenUsage => { const out: TokenUsage = {}; for (const dimension of BILLING_DIMENSIONS) { const value = counts[dimension] ?? 0; if (value > 0) out[dimension] = value; } + if (counts.tier != null) out.tier = counts.tier; return out; }; @@ -82,6 +84,7 @@ export const recordTokenUsage = async (keyId: string, modelIdentity: TelemetryMo upstream: modelIdentity.upstream, modelKey: modelIdentity.modelKey, hour: currentHour(), + tier: usage.tier ?? null, requests: 1, tokens: usage, cost: modelIdentity.cost, diff --git a/packages/gateway/src/repo/memory.ts b/packages/gateway/src/repo/memory.ts index 81bbd8e45..89088e84d 100644 --- a/packages/gateway/src/repo/memory.ts +++ b/packages/gateway/src/repo/memory.ts @@ -43,7 +43,7 @@ import { serializeStoredState } from './upstream-json.ts'; import { latencyBucketForMs } from '../shared/performance-histogram.ts'; import { generateSessionToken } from '../shared/session-tokens.ts'; import { assertWebSearchProviderName } from '../shared/web-search-providers.ts'; -import { BILLING_DIMENSIONS, type BillingDimension, type ModelPricing, unitPriceForDimension } from '@floway-dev/protocols/common'; +import { BILLING_DIMENSIONS, type BillingDimension, type ModelPricing, resolveEffectivePricing, unitPriceForDimension } from '@floway-dev/protocols/common'; import type { UpstreamModel, UpstreamRecord } from '@floway-dev/provider'; const SEED_ADMIN_USER: User = { @@ -230,6 +230,7 @@ interface UsageBucketIdentity { upstream: string | null; modelKey: string; hour: string; + tier: string | null; } interface UsageBucketState extends UsageBucketIdentity { @@ -242,13 +243,14 @@ class MemoryUsageRepo implements UsageRepo { private store = new Map(); private key(r: UsageBucketIdentity): string { - return [r.keyId, r.model, r.upstream ?? '', r.modelKey, r.hour].join('\0'); + return [r.keyId, r.model, r.upstream ?? '', r.modelKey, r.hour, r.tier ?? ''].join('\0'); } private dimensionEntries(record: UsageRecord): { dimension: BillingDimension; tokens: number; unitPrice: number | null }[] { + const effective = resolveEffectivePricing(record.cost, record.tier); return BILLING_DIMENSIONS.flatMap(dimension => { const tokens = record.tokens[dimension] ?? 0; - return tokens > 0 ? [{ dimension, tokens, unitPrice: unitPriceForDimension(record.cost, dimension) }] : []; + return tokens > 0 ? [{ dimension, tokens, unitPrice: unitPriceForDimension(effective, dimension) }] : []; }); } @@ -261,14 +263,14 @@ class MemoryUsageRepo implements UsageRepo { const unitPrice = state.unitPrices[dimension]; if (unitPrice !== undefined) (cost ??= {})[dimension] = unitPrice; } - return { keyId: state.keyId, model: state.model, upstream: state.upstream ?? null, modelKey: state.modelKey, hour: state.hour, requests: state.requests, tokens, cost }; + return { keyId: state.keyId, model: state.model, upstream: state.upstream ?? null, modelKey: state.modelKey, hour: state.hour, tier: state.tier, requests: state.requests, tokens, cost }; } private bucket(record: UsageRecord): UsageBucketState { const k = this.key(record); let state = this.store.get(k); if (!state) { - state = { keyId: record.keyId, model: record.model, upstream: record.upstream ?? null, modelKey: record.modelKey, hour: record.hour, tokens: {}, unitPrices: {}, requests: 0 }; + state = { keyId: record.keyId, model: record.model, upstream: record.upstream ?? null, modelKey: record.modelKey, hour: record.hour, tier: record.tier, tokens: {}, unitPrices: {}, requests: 0 }; this.store.set(k, state); } return state; @@ -308,6 +310,7 @@ class MemoryUsageRepo implements UsageRepo { upstream: record.upstream ?? null, modelKey: record.modelKey, hour: record.hour, + tier: record.tier, tokens: {}, unitPrices: {}, requests: record.requests, diff --git a/packages/gateway/src/repo/sql.ts b/packages/gateway/src/repo/sql.ts index f14d31337..cd15d3d70 100644 --- a/packages/gateway/src/repo/sql.ts +++ b/packages/gateway/src/repo/sql.ts @@ -39,7 +39,7 @@ import { latencyBucketForMs } from '../shared/performance-histogram.ts'; import { generateSessionToken } from '../shared/session-tokens.ts'; import { assertWebSearchProviderName } from '../shared/web-search-providers.ts'; import type { SqlDatabase, SqlPreparedStatement, SqlResult } from '@floway-dev/platform'; -import { BILLING_DIMENSIONS, type BillingDimension, type ModelPricing, unitPriceForDimension } from '@floway-dev/protocols/common'; +import { BILLING_DIMENSIONS, type BillingDimension, type ModelPricing, resolveEffectivePricing, unitPriceForDimension } from '@floway-dev/protocols/common'; import type { ProxyFallbackEntry, UpstreamModel, UpstreamProviderKind, UpstreamRecord } from '@floway-dev/provider'; const runStatements = async (db: SqlDatabase, statements: SqlPreparedStatement[]): Promise => { @@ -369,33 +369,36 @@ class SqlSessionsRepo implements SessionsRepo { } } -const dimensionRows = (record: UsageRecord): { dimension: BillingDimension; tokens: number; unitPrice: number | null }[] => - BILLING_DIMENSIONS.flatMap(dimension => { +const dimensionRows = (record: UsageRecord): { dimension: BillingDimension; tokens: number; unitPrice: number | null }[] => { + const effective = resolveEffectivePricing(record.cost, record.tier); + return BILLING_DIMENSIONS.flatMap(dimension => { const tokens = record.tokens[dimension] ?? 0; - return tokens > 0 ? [{ dimension, tokens, unitPrice: unitPriceForDimension(record.cost, dimension) }] : []; + return tokens > 0 ? [{ dimension, tokens, unitPrice: unitPriceForDimension(effective, dimension) }] : []; }); +}; class SqlUsageRepo implements UsageRepo { constructor(private db: SqlDatabase) {} async record(record: UsageRecord): Promise { const upstream = record.upstream ?? null; + const tier = record.tier; const statements: SqlPreparedStatement[] = dimensionRows(record).map(row => this.db .prepare( - `INSERT INTO usage (key_id, model, upstream, model_key, hour, dimension, tokens, unit_price) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `INSERT INTO usage (key_id, model, upstream, model_key, hour, tier, dimension, tokens, unit_price) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO UPDATE SET tokens = tokens + excluded.tokens, unit_price = COALESCE(unit_price, excluded.unit_price)`, ) - .bind(record.keyId, record.model, upstream, record.modelKey, record.hour, row.dimension, row.tokens, row.unitPrice)); + .bind(record.keyId, record.model, upstream, record.modelKey, record.hour, tier, row.dimension, row.tokens, row.unitPrice)); statements.push( this.db .prepare( - `INSERT INTO usage_requests (key_id, model, upstream, model_key, hour, requests) VALUES (?, ?, ?, ?, ?, ?) + `INSERT INTO usage_requests (key_id, model, upstream, model_key, hour, tier, requests) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO UPDATE SET requests = requests + excluded.requests`, ) - .bind(record.keyId, record.model, upstream, record.modelKey, record.hour, record.requests), + .bind(record.keyId, record.model, upstream, record.modelKey, record.hour, tier, record.requests), ); await runStatements(this.db, statements); } @@ -405,11 +408,11 @@ class SqlUsageRepo implements UsageRepo { const binds = opts.keyId ? [opts.keyId, opts.start, opts.end] : [opts.start, opts.end]; const [{ results: dimensions }, { results: requests }] = await Promise.all([ this.db - .prepare(`SELECT key_id, model, upstream, model_key, hour, dimension, tokens, unit_price FROM usage WHERE ${dimensionWhere}`) + .prepare(`SELECT key_id, model, upstream, model_key, hour, tier, dimension, tokens, unit_price FROM usage WHERE ${dimensionWhere}`) .bind(...binds) .all(), this.db - .prepare(`SELECT key_id, model, upstream, model_key, hour, requests FROM usage_requests WHERE ${dimensionWhere}`) + .prepare(`SELECT key_id, model, upstream, model_key, hour, tier, requests FROM usage_requests WHERE ${dimensionWhere}`) .bind(...binds) .all(), ]); @@ -418,32 +421,33 @@ class SqlUsageRepo implements UsageRepo { async listAll(): Promise { const [{ results: dimensions }, { results: requests }] = await Promise.all([ - this.db.prepare('SELECT key_id, model, upstream, model_key, hour, dimension, tokens, unit_price FROM usage').all(), - this.db.prepare('SELECT key_id, model, upstream, model_key, hour, requests FROM usage_requests').all(), + this.db.prepare('SELECT key_id, model, upstream, model_key, hour, tier, dimension, tokens, unit_price FROM usage').all(), + this.db.prepare('SELECT key_id, model, upstream, model_key, hour, tier, requests FROM usage_requests').all(), ]); return assembleUsageRecords(dimensions, requests); } async set(record: UsageRecord): Promise { const upstream = record.upstream ?? null; + const tier = record.tier; // Replacement upsert: clear the bucket's existing dimension rows first so // dimensions absent from the new record do not linger. const statements: SqlPreparedStatement[] = [ this.db - .prepare("DELETE FROM usage WHERE key_id = ? AND model = ? AND COALESCE(upstream, '') = COALESCE(?, '') AND model_key = ? AND hour = ?") - .bind(record.keyId, record.model, upstream, record.modelKey, record.hour), + .prepare("DELETE FROM usage WHERE key_id = ? AND model = ? AND COALESCE(upstream, '') = COALESCE(?, '') AND model_key = ? AND hour = ? AND COALESCE(tier, '') = COALESCE(?, '')") + .bind(record.keyId, record.model, upstream, record.modelKey, record.hour, tier), ...dimensionRows(record).map(row => this.db - .prepare('INSERT INTO usage (key_id, model, upstream, model_key, hour, dimension, tokens, unit_price) VALUES (?, ?, ?, ?, ?, ?, ?, ?)') - .bind(record.keyId, record.model, upstream, record.modelKey, record.hour, row.dimension, row.tokens, row.unitPrice)), + .prepare('INSERT INTO usage (key_id, model, upstream, model_key, hour, tier, dimension, tokens, unit_price) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)') + .bind(record.keyId, record.model, upstream, record.modelKey, record.hour, tier, row.dimension, row.tokens, row.unitPrice)), ]; statements.push( this.db .prepare( - `INSERT INTO usage_requests (key_id, model, upstream, model_key, hour, requests) VALUES (?, ?, ?, ?, ?, ?) + `INSERT INTO usage_requests (key_id, model, upstream, model_key, hour, tier, requests) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO UPDATE SET requests = excluded.requests`, ) - .bind(record.keyId, record.model, upstream, record.modelKey, record.hour, record.requests), + .bind(record.keyId, record.model, upstream, record.modelKey, record.hour, tier, record.requests), ); await runStatements(this.db, statements); } @@ -459,6 +463,7 @@ interface UsageDimensionRow { upstream: string | null; model_key: string; hour: string; + tier: string | null; dimension: string; tokens: number; unit_price: number | null; @@ -470,11 +475,12 @@ interface UsageRequestRow { upstream: string | null; model_key: string; hour: string; + tier: string | null; requests: number; } -const usageBucketKey = (row: { key_id: string; model: string; upstream: string | null; model_key: string; hour: string }): string => - [row.key_id, row.model, row.upstream ?? '', row.model_key, row.hour].join('\0'); +const usageBucketKey = (row: { key_id: string; model: string; upstream: string | null; model_key: string; hour: string; tier: string | null }): string => + [row.key_id, row.model, row.upstream ?? '', row.model_key, row.hour, row.tier ?? ''].join('\0'); // Reassemble per-bucket UsageRecords from the two narrow tables. The dimension // rows carry the disjoint counts and the per-dimension unit_price snapshot, @@ -483,11 +489,11 @@ const usageBucketKey = (row: { key_id: string; model: string; upstream: string | const assembleUsageRecords = (dimensions: readonly UsageDimensionRow[], requests: readonly UsageRequestRow[]): UsageRecord[] => { const byBucket = new Map(); - const ensureRecord = (row: { key_id: string; model: string; upstream: string | null; model_key: string; hour: string }): UsageRecord => { + const ensureRecord = (row: { key_id: string; model: string; upstream: string | null; model_key: string; hour: string; tier: string | null }): UsageRecord => { const key = usageBucketKey(row); let record = byBucket.get(key); if (!record) { - record = { keyId: row.key_id, model: row.model, upstream: row.upstream, modelKey: row.model_key, hour: row.hour, requests: 0, tokens: {}, cost: null }; + record = { keyId: row.key_id, model: row.model, upstream: row.upstream, modelKey: row.model_key, hour: row.hour, tier: row.tier, requests: 0, tokens: {}, cost: null }; byBucket.set(key, record); } return record; diff --git a/packages/gateway/src/repo/types.ts b/packages/gateway/src/repo/types.ts index 6c002deba..15dffb6a7 100644 --- a/packages/gateway/src/repo/types.ts +++ b/packages/gateway/src/repo/types.ts @@ -43,19 +43,30 @@ export interface UsageRecord { upstream: string | null; modelKey: string; hour: string; + // Service tier the upstream stamped on this bucket (Anthropic `speed`, + // OpenAI `service_tier`). null = the base / default tier. Distinct tiers + // for the same (keyId, model, upstream, modelKey, hour) are stored as + // separate buckets so per-tier pricing overrides apply correctly. + tier: string | null; requests: number; // Disjoint per-dimension token counts for this bucket (see TokenUsage). tokens: TokenUsage; // Pricing snapshot taken at write time. null means the provider did not // resolve pricing for this model (Custom upstreams, unknown Copilot // public id, etc.). The repo derives per-dimension unit prices from it via - // unitPriceForDimension; aggregation treats a null snapshot as cost 0. + // unitPriceForDimension after `resolveEffectivePricing(cost, tier)` folds + // in the bucket's tier override; aggregation treats a null snapshot as + // cost 0. cost: ModelPricing | null; } // Disjoint per-dimension token counts. Absent keys mean zero for that -// dimension. No key's count overlaps another's. -export type TokenUsage = Partial>; +// dimension. No key's count overlaps another's. `tier` is the upstream- +// reported service-tier marker (Anthropic `usage.speed`, OpenAI +// `usage.service_tier`) that selects an override against `cost.tiers` +// before any per-dimension unit-price lookup; absent / null = the model's +// base pricing applies. +export type TokenUsage = Partial> & { tier?: string | null }; export type SearchUsageAction = 'search' | 'fetch_page'; @@ -137,10 +148,10 @@ export interface SessionsRepo { } export interface UsageRepo { - // Additive upsert: on (keyId, model, upstream, modelKey, hour) conflict, - // token counts are summed. cost is COALESCED — the first write within a - // bucket establishes the pricing snapshot for that row, later writes that - // share the bucket keep the original snapshot. + // Additive upsert: on (keyId, model, upstream, modelKey, hour, tier) + // conflict, token counts are summed. cost is COALESCED — the first write + // within a bucket establishes the pricing snapshot for that row, later + // writes that share the bucket keep the original snapshot. record(record: UsageRecord): Promise; query(opts: { keyId?: string; start: string; end: string }): Promise; listAll(): Promise; diff --git a/packages/protocols/src/common/models.ts b/packages/protocols/src/common/models.ts index 9634cc05e..bd96f87ff 100644 --- a/packages/protocols/src/common/models.ts +++ b/packages/protocols/src/common/models.ts @@ -1,30 +1,42 @@ // Disjoint billing dimensions a single request can be charged on. Every count // keyed by these is non-overlapping: a prompt token is counted under exactly -// one of `input`, `input_cache_read`, `input_cache_write`, or `input_image`, -// never several at once. +// one of `input`, `input_cache_read`, `input_cache_write`, +// `input_cache_write_1h`, or `input_image`, never several at once. // // Convention borrowed from models.dev and LiteLLM: bare `input`/`output` mean // the text modality AND act as the fallback rate for any modality without a // dedicated rate; the `_image` variants are the image modality. There are no // image cache dimensions on purpose — a live probe of Azure gpt-image-2 -// confirmed its usage object never emits cached fields. -export type BillingDimension = 'input' | 'input_cache_read' | 'input_cache_write' | 'input_image' | 'output' | 'output_image'; +// confirmed its usage object never emits cached fields. The 5m vs 1h cache +// write split is the wire-level shape on Anthropic's side (see MessagesUsage +// `cache_creation`); from this table's perspective each is just another +// disjoint dimension with its own unit price. +export type BillingDimension = 'input' | 'input_cache_read' | 'input_cache_write' | 'input_cache_write_1h' | 'input_image' | 'output' | 'output_image'; // Iteration form of BillingDimension; the type union is the source of truth. -export const BILLING_DIMENSIONS: readonly BillingDimension[] = ['input', 'input_cache_read', 'input_cache_write', 'input_image', 'output', 'output_image']; +export const BILLING_DIMENSIONS: readonly BillingDimension[] = ['input', 'input_cache_read', 'input_cache_write', 'input_cache_write_1h', 'input_image', 'output', 'output_image']; // Per-model pricing in USD per million tokens, aligned with the sst/models.dev // `Cost` schema (https://github.com/sst/models.dev/blob/main/packages/core/src/schema.ts). // Keys are billing dimensions: bare `input`/`output` are the text/fallback rate // and `_image` keys are the image modality. Every key is optional; an absent key // falls back per `unitPriceForDimension` (modality → bare, cached → uncached). -export type ModelPricing = Partial>; +// +// `tiers` carries per-request service-tier overrides (Anthropic fast mode, +// OpenAI priority/flex). Each tier key is the wire-value the upstream stamps +// on the usage object (`fast`, `priority`, `flex`, ...). Resolve through +// `resolveEffectivePricing(pricing, usage.tier)` before any unit-price lookup. +export interface ModelPricing extends Partial> { + tiers?: Record>>; +} // Resolve the USD-per-million-tokens unit price for one dimension against a // pricing snapshot, applying the LiteLLM-style fallback chain: a modality with -// no dedicated rate falls back to the bare text rate, and cached input falls -// back to uncached input. Returns null when even the fallback base is absent -// (or the whole snapshot is null), which aggregation treats as cost 0. +// no dedicated rate falls back to the bare text rate, cached input falls back +// to uncached input, and the 1-hour cache write falls back to the 5-minute +// cache write before reaching uncached input. Returns null when even the +// fallback base is absent (or the whole snapshot is null), which aggregation +// treats as cost 0. export const unitPriceForDimension = (pricing: ModelPricing | null, dimension: BillingDimension): number | null => { if (!pricing) return null; switch (dimension) { @@ -34,6 +46,8 @@ export const unitPriceForDimension = (pricing: ModelPricing | null, dimension: B return pricing.input_cache_read ?? pricing.input ?? null; case 'input_cache_write': return pricing.input_cache_write ?? pricing.input ?? null; + case 'input_cache_write_1h': + return pricing.input_cache_write_1h ?? pricing.input_cache_write ?? pricing.input ?? null; case 'input_image': return pricing.input_image ?? pricing.input ?? null; case 'output': @@ -43,6 +57,17 @@ export const unitPriceForDimension = (pricing: ModelPricing | null, dimension: B } }; +// Fold the per-tier override (if any) into a flat ModelPricing snapshot, so +// every downstream `unitPriceForDimension` call sees one self-contained map. +// An unknown or absent tier returns the base snapshot unchanged (without +// `tiers`). +export const resolveEffectivePricing = (pricing: ModelPricing | null, tier: string | null | undefined): ModelPricing | null => { + if (!pricing) return null; + const { tiers, ...base } = pricing; + const override = tier != null ? tiers?.[tier] : undefined; + return override ? { ...base, ...override } : base; +}; + // High-level endpoint-family discriminator. A model belongs to exactly one // kind; cross-cutting features (vision, function calling, structured // outputs) are orthogonal and modeled separately when needed. diff --git a/packages/provider-custom/src/fetch-models.ts b/packages/provider-custom/src/fetch-models.ts index ab9ddefa5..40d567740 100644 --- a/packages/provider-custom/src/fetch-models.ts +++ b/packages/provider-custom/src/fetch-models.ts @@ -11,7 +11,7 @@ import type { CustomUpstreamConfig } from './config.ts'; import { customFetchModels } from './fetch.ts'; -import type { ModelKind, ModelPricing } from '@floway-dev/protocols/common'; +import { BILLING_DIMENSIONS, type ModelKind, type ModelPricing } from '@floway-dev/protocols/common'; import { fetchUpstreamModels, type Fetcher } from '@floway-dev/provider'; export interface CustomRawModel { @@ -58,14 +58,12 @@ const parseLimits = (value: unknown): CustomRawModel['limits'] => { return Object.keys(limits).length > 0 ? limits : undefined; }; -const PRICING_DIMENSIONS: readonly (keyof ModelPricing)[] = ['input', 'input_cache_read', 'input_cache_write', 'input_image', 'output', 'output_image']; - const parseCost = (value: unknown): ModelPricing | undefined => { // Admit any subset of billing dimensions advertised on the upstream's // /v1/models cost block; drop the whole block when none are present. if (!isRecord(value)) return undefined; const cost: ModelPricing = {}; - for (const dimension of PRICING_DIMENSIONS) { + for (const dimension of BILLING_DIMENSIONS) { const rate = optionalNumberField(value[dimension]); if (rate !== undefined) cost[dimension] = rate; } diff --git a/packages/provider/src/model-config.ts b/packages/provider/src/model-config.ts index c3804052a..da1692d40 100644 --- a/packages/provider/src/model-config.ts +++ b/packages/provider/src/model-config.ts @@ -1,5 +1,5 @@ import { isKnownFlagId } from './flags.ts'; -import type { ModelEndpointKey, ModelEndpoints, ModelKind, ModelPricing } from '@floway-dev/protocols/common'; +import { BILLING_DIMENSIONS, type ModelEndpointKey, type ModelEndpoints, type ModelKind, type ModelPricing } from '@floway-dev/protocols/common'; import { kindForEndpoints } from '@floway-dev/protocols/common'; export interface UpstreamModelLimits { @@ -120,13 +120,11 @@ const nonNegativeNumberField = (value: unknown, label: string): number => { return value; }; -const PRICING_DIMENSIONS: readonly (keyof ModelPricing)[] = ['input', 'input_cache_read', 'input_cache_write', 'input_image', 'output', 'output_image']; - export const pricingField = (value: unknown, label: string): ModelPricing | undefined => { const record = optionalMetadataRecord(value, label); if (!record) return undefined; const pricing: ModelPricing = {}; - for (const dimension of PRICING_DIMENSIONS) { + for (const dimension of BILLING_DIMENSIONS) { if (record[dimension] !== undefined) pricing[dimension] = nonNegativeNumberField(record[dimension], `${label}.${dimension}`); } return Object.keys(pricing).length > 0 ? pricing : undefined; From 845112538861ece1eeb54d5a228f74c28df29273 Mon Sep 17 00:00:00 2001 From: Menci Date: Sat, 20 Jun 2026 00:51:26 +0800 Subject: [PATCH 2/4] feat(gateway): parse per-TTL cache writes and service tier across protocol shapes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Messages: when the upstream emits `usage.cache_creation` (the structured sub-object from extended-cache-ttl-2025-04-11), split the per-TTL counts onto `input_cache_write` (5m) and `input_cache_write_1h` (1h). Fall back to the flat `cache_creation_input_tokens` when the sub-object is absent. Capture `usage.speed: 'fast'` as `tier: 'fast'`; standard tier is left unset so base-tier rows aggregate with historical no-tier rows. - Responses: surface the top-level `response.service_tier` as the bucket `tier`. Drop `default` and `auto` (both denote base pricing) so we don't needlessly split base-tier buckets. The WebSocket path now reads service_tier from the response object too — matching the HTTP path. - Chat Completions: same as Responses but reading the top-level `chunk.service_tier` (chat.completion[.chunk]). Protocol types grow `MessagesUsage.cache_creation`, `MessagesUsage.speed`, `ResponsesResult.service_tier`, `ChatCompletionsResult.service_tier`, and `ChatCompletionsStreamEvent.service_tier`. Dashboard usage page folds `input_cache_write_1h` into the existing Cache Write column so the per-TTL split shows up correctly in totals, prefill, and per-bucket detail rows. The model editor exposes a dedicated 1-hour cache write input field so operators on custom upstreams can price it. Tests cover the per-TTL split (sub-object takes precedence over the rolled-up flat field), fallback to the flat field, speed=fast extraction, and per-tier override applying during cost compute including the new input_cache_write_1h dimension fallback chain (1h -> 5m -> input). --- apps/web/src/api/types.ts | 3 +- .../components/upstream-edit/ModelEditor.vue | 11 +- apps/web/src/pages/dashboard/usage.vue | 15 +- .../token-usage/aggregate_test.ts | 39 ++++ .../llm/chat-completions/respond.ts | 12 +- .../llm/messages/events/reassemble.ts | 2 + .../src/data-plane/llm/messages/respond.ts | 27 ++- .../data-plane/llm/messages/respond_test.ts | 173 +++++++++--------- .../src/data-plane/llm/responses/respond.ts | 21 ++- .../src/data-plane/llm/responses/websocket.ts | 13 +- .../src/data-plane/shared/telemetry/usage.ts | 11 +- .../protocols/src/chat-completions/index.ts | 6 + packages/protocols/src/common/models_test.ts | 55 ++++++ packages/protocols/src/messages/index.ts | 16 ++ packages/protocols/src/responses/index.ts | 5 + 15 files changed, 276 insertions(+), 133 deletions(-) create mode 100644 packages/protocols/src/common/models_test.ts diff --git a/apps/web/src/api/types.ts b/apps/web/src/api/types.ts index 6b91b7ea7..069233d2f 100644 --- a/apps/web/src/api/types.ts +++ b/apps/web/src/api/types.ts @@ -22,7 +22,8 @@ export interface ModelEndpoints { export type ModelEndpointKey = keyof ModelEndpoints; // USD per million tokens, keyed by billing dimension. -export type ModelPricing = Partial>; +export type BillingDimension = 'input' | 'input_cache_read' | 'input_cache_write' | 'input_cache_write_1h' | 'input_image' | 'output' | 'output_image'; +export type ModelPricing = Partial>; export interface UpstreamModelConfig { upstreamModelId: string; diff --git a/apps/web/src/components/upstream-edit/ModelEditor.vue b/apps/web/src/components/upstream-edit/ModelEditor.vue index 3d8e18ec7..a4af2eb32 100644 --- a/apps/web/src/components/upstream-edit/ModelEditor.vue +++ b/apps/web/src/components/upstream-edit/ModelEditor.vue @@ -4,7 +4,7 @@ import { computed } from 'vue'; import EndpointsField from './EndpointsField.vue'; import FlagOverridesEditor from './FlagOverridesEditor.vue'; import { configOf, defaultEndpointsForKind, publicIdOf, titleFor, type Row } from './modelRows.ts'; -import type { FlagDef, ModelKind, ModelPricing, UpstreamModelConfig, UpstreamProviderKind } from '../../api/types.ts'; +import type { BillingDimension, FlagDef, ModelKind, ModelPricing, UpstreamModelConfig, UpstreamProviderKind } from '../../api/types.ts'; import { Button, Input, Select, Switch } from '@floway-dev/ui'; const props = defineProps<{ @@ -37,14 +37,15 @@ const kindOptions: { value: ModelKind; label: string }[] = [ const PRICING_LABELS: Record = { input: 'Input ($/MTok)', input_cache_read: 'Cache Read ($/MTok)', - input_cache_write: 'Cache Write ($/MTok)', + input_cache_write: 'Cache Write 5m ($/MTok)', + input_cache_write_1h: 'Cache Write 1h ($/MTok)', input_image: 'Image Input ($/MTok)', output: 'Output ($/MTok)', output_image: 'Image Output ($/MTok)', }; -const PRICING_BY_KIND: Record = { - chat: ['input', 'input_cache_read', 'input_cache_write', 'output'], +const PRICING_BY_KIND: Record = { + chat: ['input', 'input_cache_read', 'input_cache_write', 'input_cache_write_1h', 'output'], embedding: ['input'], image: ['input', 'input_image', 'output', 'output_image'], }; @@ -81,7 +82,7 @@ const updateLimit = ( patch({ limits: Object.keys(limits).length > 0 ? limits : undefined }); }; -const updateCost = (key: keyof ModelPricing, raw: string | number | null | undefined) => { +const updateCost = (key: BillingDimension, raw: string | number | null | undefined) => { if (!config.value) return; const cost = { ...(config.value.cost ?? {}) } as Record; const num = parseOptionalNumber(raw); diff --git a/apps/web/src/pages/dashboard/usage.vue b/apps/web/src/pages/dashboard/usage.vue index ff4909763..2e206639f 100644 --- a/apps/web/src/pages/dashboard/usage.vue +++ b/apps/web/src/pages/dashboard/usage.vue @@ -6,6 +6,7 @@ import { defineBasicLoader } from 'unplugin-vue-router/data-loaders/basic'; import { computed, ref, watch } from 'vue'; import { callApi, useApi, type ApiClient } from '../../api/client.ts'; +import type { BillingDimension } from '../../api/types.ts'; import ChartCanvas from '../../components/charts/ChartCanvas.vue'; import { bucketKeyForUtcHour, chartColor, chartFont, chartXAxisTick, dashboardBuckets, dashboardRangeQuery, type DashboardRange } from '../../components/charts/dashboard-chart.ts'; import UsageSummaryMetric from '../../components/usage/UsageSummaryMetric.vue'; @@ -13,8 +14,6 @@ import { useModelsStore } from '../../composables/useModels.ts'; import { useAuthStore } from '../../stores/auth.ts'; import { OverlayScrollbars, Spinner } from '@floway-dev/ui'; -type BillingDimension = 'input' | 'input_cache_read' | 'input_cache_write' | 'input_image' | 'output' | 'output_image'; - interface DisplayUsageRecord { keyId: string; keyName?: string; @@ -190,7 +189,7 @@ const tokenSummary = computed(() => { input += dim(r, 'input'); output += dim(r, 'output'); cacheRead += dim(r, 'input_cache_read'); - cacheCreation += dim(r, 'input_cache_write'); + cacheCreation += dim(r, 'input_cache_write') + dim(r, 'input_cache_write_1h'); inputImage += dim(r, 'input_image'); outputImage += dim(r, 'output_image'); } @@ -240,12 +239,12 @@ const metricValue = (r: DisplayUsageRecord, metric: Metric): number => { switch (metric) { case 'requests': return r.requests; case 'cost': return r.cost; - case 'total': return dim(r, 'input') + dim(r, 'output') + dim(r, 'input_cache_read') + dim(r, 'input_cache_write') + dim(r, 'input_image') + dim(r, 'output_image'); - case 'input': return dim(r, 'input') + dim(r, 'input_cache_read') + dim(r, 'input_cache_write') + dim(r, 'input_image'); + case 'total': return dim(r, 'input') + dim(r, 'output') + dim(r, 'input_cache_read') + dim(r, 'input_cache_write') + dim(r, 'input_cache_write_1h') + dim(r, 'input_image') + dim(r, 'output_image'); + case 'input': return dim(r, 'input') + dim(r, 'input_cache_read') + dim(r, 'input_cache_write') + dim(r, 'input_cache_write_1h') + dim(r, 'input_image'); case 'output': return dim(r, 'output') + dim(r, 'output_image'); - case 'prefill': return dim(r, 'input') + dim(r, 'input_cache_write') + dim(r, 'input_image'); + case 'prefill': return dim(r, 'input') + dim(r, 'input_cache_write') + dim(r, 'input_cache_write_1h') + dim(r, 'input_image'); case 'cached': return dim(r, 'input_cache_read'); - case 'cacheCreation': return dim(r, 'input_cache_write'); + case 'cacheCreation': return dim(r, 'input_cache_write') + dim(r, 'input_cache_write_1h'); case 'cachedRate': case 'cacheHitRate': return 0; @@ -339,7 +338,7 @@ const aggregateTokenRecords = (records: readonly DisplayUsageRecord[], groupKey: detail.input += dim(r, 'input'); detail.output += dim(r, 'output'); detail.cacheRead += dim(r, 'input_cache_read'); - detail.cacheCreation += dim(r, 'input_cache_write'); + detail.cacheCreation += dim(r, 'input_cache_write') + dim(r, 'input_cache_write_1h'); detail.inputImage += dim(r, 'input_image'); detail.outputImage += dim(r, 'output_image'); detail.cost += r.cost; diff --git a/packages/gateway/src/control-plane/token-usage/aggregate_test.ts b/packages/gateway/src/control-plane/token-usage/aggregate_test.ts index 69128eea9..ab1b3b79e 100644 --- a/packages/gateway/src/control-plane/token-usage/aggregate_test.ts +++ b/packages/gateway/src/control-plane/token-usage/aggregate_test.ts @@ -84,3 +84,42 @@ test('aggregateUsageForDisplay charges image dimensions separately', () => { // 10 + 5 + 40 + 30 = $85. assertAlmostEquals(out[0].cost, 85, 1e-9); }); + +test('aggregateUsageForDisplay applies the per-tier override when the bucket carries a tier', () => { + // Opus 4.8 standard: $5 input / $25 output. Fast: $10 / $50. + const cost: ModelPricing = { + input: 5, + output: 25, + tiers: { fast: { input: 10, output: 50 } }, + }; + const fastRow = baseRecord({ tier: 'fast', cost, tokens: { input: 1_000_000, output: 1_000_000 } }); + const standardRow = baseRecord({ tier: null, cost, tokens: { input: 1_000_000, output: 1_000_000 } }); + + const fastOut = aggregateUsageForDisplay([fastRow]); + // 1M * $10 + 1M * $50 = $60. + assertAlmostEquals(fastOut[0].cost, 60, 1e-9); + + const standardOut = aggregateUsageForDisplay([standardRow]); + // 1M * $5 + 1M * $25 = $30. + assertAlmostEquals(standardOut[0].cost, 30, 1e-9); +}); + +test('aggregateUsageForDisplay leaves base pricing alone when the tier has no override entry', () => { + const cost: ModelPricing = { + input: 5, + output: 25, + tiers: { fast: { input: 10, output: 50 } }, + }; + const out = aggregateUsageForDisplay([baseRecord({ tier: 'priority', cost, tokens: { input: 1_000_000 } })]); + // Unknown tier → falls back to base $5 input. 1M * $5 = $5. + assertAlmostEquals(out[0].cost, 5, 1e-9); +}); + +test('aggregateUsageForDisplay prices the input_cache_write_1h dimension via the 1h-specific rate', () => { + const cost: ModelPricing = { input: 5, input_cache_write: 6.25, input_cache_write_1h: 10, output: 25 }; + const out = aggregateUsageForDisplay([ + baseRecord({ cost, tokens: { input_cache_write_1h: 1_000_000 } }), + ]); + // 1M * $10 = $10. + assertAlmostEquals(out[0].cost, 10, 1e-9); +}); diff --git a/packages/gateway/src/data-plane/llm/chat-completions/respond.ts b/packages/gateway/src/data-plane/llm/chat-completions/respond.ts index 07e56679b..e3552b85b 100644 --- a/packages/gateway/src/data-plane/llm/chat-completions/respond.ts +++ b/packages/gateway/src/data-plane/llm/chat-completions/respond.ts @@ -3,7 +3,7 @@ import { streamSSE } from 'hono/streaming'; import { CHAT_COMPLETIONS_MISSING_TERMINAL_MESSAGE, collectChatCompletionsProtocolEventsToResult } from './events/to-result.ts'; import { chatCompletionsProtocolFrameToSSEFrame } from './events/to-sse.ts'; -import { tokenUsage } from '../../shared/telemetry/usage.ts'; +import { normalizeOpenAiServiceTier, tokenUsage } from '../../shared/telemetry/usage.ts'; import type { GatewayCtx } from '../shared/gateway-ctx.ts'; import { SourceStreamState, eventResultMetadata, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; import { type StreamCompletion, writeSSEFrames } from '../shared/stream/sse.ts'; @@ -44,7 +44,7 @@ export const respondChatCompletions = async ( try { const response = await collectChatCompletionsProtocolEventsToResult(frames); const metadata = await eventResultMetadata(result); - const usage = response.usage ? tokenUsageFromChatCompletionsUsage(response.usage) : null; + const usage = response.usage ? tokenUsageFromChatCompletionsUsage(response.usage, response.service_tier) : null; await recordUsage(ctx, metadata.modelIdentity, usage); recordPerformance(ctx, metadata.performance, state.failed); return { success: true, response: Response.json(response) }; @@ -80,14 +80,18 @@ export const respondChatCompletions = async ( // OpenAI Chat usage reports prompt_tokens inclusive of cached and // cache-creation tokens; subtract them to recover the disjoint bare input. -const tokenUsageFromChatCompletionsUsage = (u: NonNullable) => { +// The top-level `service_tier` echoes the actual processing tier; surface it +// as the `tier` slot so per-tier pricing overrides resolve at recording time. +const tokenUsageFromChatCompletionsUsage = (u: NonNullable, serviceTier: string | null | undefined) => { const cacheRead = u.prompt_tokens_details?.cached_tokens ?? 0; const cacheWrite = u.prompt_tokens_details?.cache_creation_input_tokens ?? 0; + const tier = normalizeOpenAiServiceTier(serviceTier); return tokenUsage({ input: u.prompt_tokens - cacheRead - cacheWrite, input_cache_read: cacheRead, input_cache_write: cacheWrite, output: u.completion_tokens, + ...(tier !== null ? { tier } : {}), }); }; @@ -118,7 +122,7 @@ const observeChatCompletionsFrames = async function* (frames: AsyncIterable if (update.cache_read_input_tokens != null) { usage.cache_read_input_tokens = update.cache_read_input_tokens; } + if (update.cache_creation != null) usage.cache_creation = update.cache_creation; if (update.service_tier != null) usage.service_tier = update.service_tier; + if (update.speed != null) usage.speed = update.speed; if (update.server_tool_use != null) { usage.server_tool_use = update.server_tool_use; } diff --git a/packages/gateway/src/data-plane/llm/messages/respond.ts b/packages/gateway/src/data-plane/llm/messages/respond.ts index d679428aa..620825e5a 100644 --- a/packages/gateway/src/data-plane/llm/messages/respond.ts +++ b/packages/gateway/src/data-plane/llm/messages/respond.ts @@ -77,14 +77,23 @@ export const respondMessages = async ( }; // Anthropic already reports disjoint token counts: input_tokens excludes the -// cache figures. Map them straight onto the billing dimensions without summing. -const tokenUsageFromMessagesUsage = (u: MessagesUsageLike) => - tokenUsage({ - input: u.input_tokens ?? 0, - input_cache_read: u.cache_read_input_tokens ?? 0, - input_cache_write: u.cache_creation_input_tokens ?? 0, - output: u.output_tokens, - }); +// cache figures. Map them straight onto the billing dimensions without +// summing. When the upstream emits the `cache_creation` sub-object +// (extended-cache-ttl-2025-04-11), split the per-TTL counts onto the 5m and +// 1h dimensions; the flat `cache_creation_input_tokens` is the sum and is +// only consulted when the sub-object is absent. The Opus 4.6+ Messages +// usage object carries `speed: 'standard' | 'fast'`, which we surface as +// `tier: 'fast'` so per-tier pricing overrides resolve at recording time +// (standard maps to base pricing — left unset rather than echoed as +// 'standard' so old usage rows and base-tier rows aggregate together). +const tokenUsageFromMessagesUsage = (u: MessagesUsageLike) => tokenUsage({ + input: u.input_tokens ?? 0, + input_cache_read: u.cache_read_input_tokens ?? 0, + input_cache_write: u.cache_creation?.ephemeral_5m_input_tokens ?? u.cache_creation_input_tokens ?? 0, + input_cache_write_1h: u.cache_creation?.ephemeral_1h_input_tokens ?? 0, + output: u.output_tokens, + ...(u.speed === 'fast' ? { tier: 'fast' } : {}), +}); export const createMessagesStreamUsageState = () => ({ current: tokenUsage({}), @@ -102,7 +111,7 @@ export const tokenUsageFromMessagesFrame = (frame: ProtocolFrame 0; + state.gotInputFromStart ||= (state.current.input ?? 0) + (state.current.input_cache_read ?? 0) + (state.current.input_cache_write ?? 0) + (state.current.input_cache_write_1h ?? 0) > 0; } if (event.type === 'message_delta' && event.usage) { if (!state.gotInputFromStart && event.usage.input_tokens !== undefined) { diff --git a/packages/gateway/src/data-plane/llm/messages/respond_test.ts b/packages/gateway/src/data-plane/llm/messages/respond_test.ts index e59bd40be..c7bcaee23 100644 --- a/packages/gateway/src/data-plane/llm/messages/respond_test.ts +++ b/packages/gateway/src/data-plane/llm/messages/respond_test.ts @@ -2,49 +2,40 @@ import { test } from 'vitest'; import { createMessagesStreamUsageState, tokenUsageFromMessagesFrame } from './respond.ts'; import { eventFrame } from '@floway-dev/protocols/common'; -import type { MessagesStreamEvent } from '@floway-dev/protocols/messages'; +import type { MessagesStreamEvent, MessagesUsage } from '@floway-dev/protocols/messages'; import { assertEquals } from '@floway-dev/test-utils'; const stop = () => eventFrame({ type: 'message_stop' } satisfies MessagesStreamEvent); +const messageStart = (usage: MessagesUsage) => + eventFrame({ + type: 'message_start', + message: { + id: 'msg_1', + type: 'message', + role: 'assistant', + content: [], + model: 'claude-test', + stop_reason: null, + stop_sequence: null, + usage, + }, + } satisfies MessagesStreamEvent); + +const messageDelta = (usage: NonNullable['usage']>) => + eventFrame({ type: 'message_delta', delta: {}, usage } satisfies MessagesStreamEvent); + test('Messages stream usage keeps start input and delta output', () => { const state = createMessagesStreamUsageState(); assertEquals( tokenUsageFromMessagesFrame( - eventFrame({ - type: 'message_start', - message: { - id: 'msg_1', - type: 'message', - role: 'assistant', - content: [], - model: 'claude-test', - stop_reason: null, - stop_sequence: null, - usage: { - input_tokens: 12, - output_tokens: 1, - cache_creation_input_tokens: 4, - cache_read_input_tokens: 3, - }, - }, - } satisfies MessagesStreamEvent), - state, - ), - null, - ); - assertEquals( - tokenUsageFromMessagesFrame( - eventFrame({ - type: 'message_delta', - delta: {}, - usage: { output_tokens: 7 }, - } satisfies MessagesStreamEvent), + messageStart({ input_tokens: 12, output_tokens: 1, cache_creation_input_tokens: 4, cache_read_input_tokens: 3 }), state, ), null, ); + assertEquals(tokenUsageFromMessagesFrame(messageDelta({ output_tokens: 7 }), state), null); assertEquals(tokenUsageFromMessagesFrame(stop(), state), { input: 12, @@ -57,43 +48,12 @@ test('Messages stream usage keeps start input and delta output', () => { test('Messages stream usage can recover input from delta', () => { const state = createMessagesStreamUsageState(); + tokenUsageFromMessagesFrame(messageStart({ input_tokens: 0, output_tokens: 0 }), state); tokenUsageFromMessagesFrame( - eventFrame({ - type: 'message_start', - message: { - id: 'msg_1', - type: 'message', - role: 'assistant', - content: [], - model: 'claude-test', - stop_reason: null, - stop_sequence: null, - usage: { input_tokens: 0, output_tokens: 0 }, - }, - } satisfies MessagesStreamEvent), - state, - ); - tokenUsageFromMessagesFrame( - eventFrame({ - type: 'message_delta', - delta: {}, - usage: { - input_tokens: 11, - output_tokens: 2, - cache_creation_input_tokens: 7, - cache_read_input_tokens: 5, - }, - } satisfies MessagesStreamEvent), - state, - ); - tokenUsageFromMessagesFrame( - eventFrame({ - type: 'message_delta', - delta: {}, - usage: { output_tokens: 6 }, - } satisfies MessagesStreamEvent), + messageDelta({ input_tokens: 11, output_tokens: 2, cache_creation_input_tokens: 7, cache_read_input_tokens: 5 }), state, ); + tokenUsageFromMessagesFrame(messageDelta({ output_tokens: 6 }), state); assertEquals(tokenUsageFromMessagesFrame(stop(), state), { input: 11, @@ -110,32 +70,77 @@ test('Messages stream usage keeps cache-only start when a later delta carries in const state = createMessagesStreamUsageState(); tokenUsageFromMessagesFrame( - eventFrame({ - type: 'message_start', - message: { - id: 'msg_1', - type: 'message', - role: 'assistant', - content: [], - model: 'claude-test', - stop_reason: null, - stop_sequence: null, - usage: { input_tokens: 0, output_tokens: 1, cache_read_input_tokens: 1000 }, - }, - } satisfies MessagesStreamEvent), + messageStart({ input_tokens: 0, output_tokens: 1, cache_read_input_tokens: 1000 }), state, ); + tokenUsageFromMessagesFrame(messageDelta({ input_tokens: 0, output_tokens: 50 }), state); + + assertEquals(tokenUsageFromMessagesFrame(stop(), state), { + input_cache_read: 1000, + output: 50, + }); +}); + +test('Messages stream usage splits cache_creation per-TTL when the sub-object is present', () => { + const state = createMessagesStreamUsageState(); + tokenUsageFromMessagesFrame( - eventFrame({ - type: 'message_delta', - delta: {}, - usage: { input_tokens: 0, output_tokens: 50 }, - } satisfies MessagesStreamEvent), + messageStart({ + input_tokens: 12, + output_tokens: 1, + cache_creation_input_tokens: 9, + cache_creation: { ephemeral_5m_input_tokens: 4, ephemeral_1h_input_tokens: 5 }, + cache_read_input_tokens: 3, + }), state, ); assertEquals(tokenUsageFromMessagesFrame(stop(), state), { - input_cache_read: 1000, - output: 50, + input: 12, + input_cache_read: 3, + input_cache_write: 4, + input_cache_write_1h: 5, + output: 1, + }); +}); + +test('Messages stream usage falls back to the rolled-up cache_creation when the sub-object is absent', () => { + const state = createMessagesStreamUsageState(); + + tokenUsageFromMessagesFrame( + messageStart({ input_tokens: 12, output_tokens: 1, cache_creation_input_tokens: 9, cache_read_input_tokens: 3 }), + state, + ); + + assertEquals(tokenUsageFromMessagesFrame(stop(), state), { + input: 12, + input_cache_read: 3, + input_cache_write: 9, + output: 1, }); }); + +test('Messages stream usage captures speed=fast as tier=fast', () => { + const state = createMessagesStreamUsageState(); + + tokenUsageFromMessagesFrame( + messageStart({ input_tokens: 5, output_tokens: 0, speed: 'fast' }), + state, + ); + + assertEquals(tokenUsageFromMessagesFrame(stop(), state), { + input: 5, + tier: 'fast', + }); +}); + +test('Messages stream usage leaves tier unset when speed is standard', () => { + const state = createMessagesStreamUsageState(); + + tokenUsageFromMessagesFrame( + messageStart({ input_tokens: 5, output_tokens: 0, speed: 'standard' }), + state, + ); + + assertEquals(tokenUsageFromMessagesFrame(stop(), state), { input: 5 }); +}); diff --git a/packages/gateway/src/data-plane/llm/responses/respond.ts b/packages/gateway/src/data-plane/llm/responses/respond.ts index 58795f9c7..394a4c29e 100644 --- a/packages/gateway/src/data-plane/llm/responses/respond.ts +++ b/packages/gateway/src/data-plane/llm/responses/respond.ts @@ -3,7 +3,7 @@ import { streamSSE } from 'hono/streaming'; import { RESPONSES_MISSING_TERMINAL_MESSAGE, collectResponsesProtocolEventsToResult } from './events/to-result.ts'; import { responsesProtocolFrameToSSEFrame } from './events/to-sse.ts'; -import { tokenUsage } from '../../shared/telemetry/usage.ts'; +import { normalizeOpenAiServiceTier, tokenUsage } from '../../shared/telemetry/usage.ts'; import type { GatewayCtx } from '../shared/gateway-ctx.ts'; import { SourceStreamState, eventResultMetadata, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; import { type StreamCompletion, writeSSEFrames } from '../shared/stream/sse.ts'; @@ -76,15 +76,20 @@ export const respondResponses = async ( // --- token usage --- // OpenAI Responses reports input_tokens inclusive of cached tokens; subtract -// the cached split to recover the disjoint bare input. -const tokenUsageFromResponsesResult = (r: ResponsesResult) => { - const u = r.usage; - if (!u) return null; - const cacheRead = u.input_tokens_details?.cached_tokens ?? 0; +// the cached split to recover the disjoint bare input. The top-level +// `service_tier` echoes the actual processing tier the upstream served the +// request at, which we surface as the `tier` slot so per-tier pricing +// overrides resolve at recording time. +export const tokenUsageFromResponsesResult = (response: ResponsesResult) => { + const usage = response.usage; + if (!usage) return null; + const cacheRead = usage.input_tokens_details?.cached_tokens ?? 0; + const tier = normalizeOpenAiServiceTier(response.service_tier); return tokenUsage({ - input: u.input_tokens - cacheRead, + input: usage.input_tokens - cacheRead, input_cache_read: cacheRead, - output: u.output_tokens, + output: usage.output_tokens, + ...(tier !== null ? { tier } : {}), }); }; diff --git a/packages/gateway/src/data-plane/llm/responses/websocket.ts b/packages/gateway/src/data-plane/llm/responses/websocket.ts index 4a431cbca..49ba6d39d 100644 --- a/packages/gateway/src/data-plane/llm/responses/websocket.ts +++ b/packages/gateway/src/data-plane/llm/responses/websocket.ts @@ -2,9 +2,9 @@ import type { Context } from 'hono'; import { RESPONSES_MISSING_TERMINAL_MESSAGE } from './events/to-result.ts'; import { createResponsesWsSession } from './items/store.ts'; +import { tokenUsageFromResponsesResult } from './respond.ts'; import { PreviousResponseNotFoundError } from './serve-prep.ts'; import { responsesServe } from './serve.ts'; -import { tokenUsage } from '../../shared/telemetry/usage.ts'; import { createGatewayCtxForWs, type GatewayCtx } from '../shared/gateway-ctx.ts'; import { SourceStreamState, eventResultMetadata, recordPerformance, recordUsage } from '../shared/respond.ts'; import { DOWNSTREAM_KEEP_ALIVE_INTERVAL_MS, type StreamCompletion } from '../shared/stream/sse.ts'; @@ -398,17 +398,6 @@ const serverErrorEnvelope = (error: unknown): Record => ({ code: 'internal_error', }); -const tokenUsageFromResponsesResult = (response: ResponsesResult) => { - const usage = response.usage; - if (!usage) return null; - const cacheRead = usage.input_tokens_details?.cached_tokens ?? 0; - return tokenUsage({ - input: usage.input_tokens - cacheRead, - input_cache_read: cacheRead, - output: usage.output_tokens, - }); -}; - const responseDoneSummary = (event: unknown) => { if (!event || typeof event !== 'object') return null; const type = (event as { type?: unknown }).type; diff --git a/packages/gateway/src/data-plane/shared/telemetry/usage.ts b/packages/gateway/src/data-plane/shared/telemetry/usage.ts index 9fb30ce2a..e4c7efdc5 100644 --- a/packages/gateway/src/data-plane/shared/telemetry/usage.ts +++ b/packages/gateway/src/data-plane/shared/telemetry/usage.ts @@ -7,8 +7,8 @@ import type { TelemetryModelIdentity } from '@floway-dev/provider'; export const hasTokenUsage = (usage: TokenUsage): boolean => BILLING_DIMENSIONS.some(dimension => (usage[dimension] ?? 0) > 0); // Drop zero / undefined dimensions so a usage map only carries the dimensions -// actually billed. `tier` (a non-numeric service-tier marker) survives the -// filter so per-tier pricing overrides resolve at recording time. +// actually billed. `tier` is a non-numeric service-tier marker, not a billing +// dimension; it survives the filter. export const tokenUsage = (counts: TokenUsage): TokenUsage => { const out: TokenUsage = {}; for (const dimension of BILLING_DIMENSIONS) { @@ -19,6 +19,13 @@ export const tokenUsage = (counts: TokenUsage): TokenUsage => { return out; }; +// OpenAI `service_tier` echoes the actual processing tier (priority, flex, +// scale, default, auto). `default` and `auto` denote base pricing — stamping +// every row with one of them would split base-tier buckets pointlessly from +// rows where the upstream omitted the field — so they collapse to null. +export const normalizeOpenAiServiceTier = (tier: string | null | undefined): string | null => + tier == null || tier === 'default' || tier === 'auto' ? null : tier; + export const tokenUsageFromPromptTokenResponse = (usage: unknown): TokenUsage | null => { if (!usage || typeof usage !== 'object') return null; const promptTokens = (usage as { prompt_tokens?: unknown }).prompt_tokens; diff --git a/packages/protocols/src/chat-completions/index.ts b/packages/protocols/src/chat-completions/index.ts index fabe3f32e..d6f87fb7f 100644 --- a/packages/protocols/src/chat-completions/index.ts +++ b/packages/protocols/src/chat-completions/index.ts @@ -82,6 +82,11 @@ export interface ChatCompletionsResult { created: number; model: string; choices: ChatCompletionsChoiceNonStreaming[]; + // OpenAI stamps the actual processing tier on the response object whenever + // the request set `service_tier`. Wire values mirror the request enum: + // 'auto' | 'default' | 'flex' | 'scale' | 'priority'. + // https://platform.openai.com/docs/api-reference/chat/object + service_tier?: string | null; usage?: ChatCompletionsUsage; } @@ -91,6 +96,7 @@ export interface ChatCompletionsStreamEvent { created: number; model: string; choices: ChatCompletionsChoiceStreaming[]; + service_tier?: string | null; usage?: ChatCompletionsUsage; } diff --git a/packages/protocols/src/common/models_test.ts b/packages/protocols/src/common/models_test.ts new file mode 100644 index 000000000..fe1c4275c --- /dev/null +++ b/packages/protocols/src/common/models_test.ts @@ -0,0 +1,55 @@ +import { test } from 'vitest'; + +import { resolveEffectivePricing, unitPriceForDimension, type ModelPricing } from './models.ts'; +import { assertEquals } from '../test-assert.ts'; + +test('unitPriceForDimension input_cache_write_1h falls back 1h → 5m → input', () => { + const explicit: ModelPricing = { input: 5, input_cache_write: 6.25, input_cache_write_1h: 10 }; + assertEquals(unitPriceForDimension(explicit, 'input_cache_write_1h'), 10); + + const onlyFiveMinute: ModelPricing = { input: 5, input_cache_write: 6.25 }; + assertEquals(unitPriceForDimension(onlyFiveMinute, 'input_cache_write_1h'), 6.25); + + const onlyInput: ModelPricing = { input: 5 }; + assertEquals(unitPriceForDimension(onlyInput, 'input_cache_write_1h'), 5); + + assertEquals(unitPriceForDimension({}, 'input_cache_write_1h'), null); + assertEquals(unitPriceForDimension(null, 'input_cache_write_1h'), null); +}); + +test('resolveEffectivePricing merges a tier override into the base snapshot and strips tiers', () => { + const base: ModelPricing = { + input: 5, + input_cache_read: 0.5, + input_cache_write: 6.25, + input_cache_write_1h: 10, + output: 25, + tiers: { fast: { input: 30, output: 150, input_cache_write_1h: 60 } }, + }; + const effective = resolveEffectivePricing(base, 'fast'); + assertEquals(effective, { + input: 30, + input_cache_read: 0.5, + input_cache_write: 6.25, + input_cache_write_1h: 60, + output: 150, + }); +}); + +test('resolveEffectivePricing returns the base snapshot (sans tiers) when tier is unknown or absent', () => { + const base: ModelPricing = { + input: 5, + output: 25, + tiers: { fast: { input: 30 } }, + }; + const expected: ModelPricing = { input: 5, output: 25 }; + + assertEquals(resolveEffectivePricing(base, null), expected); + assertEquals(resolveEffectivePricing(base, undefined), expected); + assertEquals(resolveEffectivePricing(base, 'priority'), expected); +}); + +test('resolveEffectivePricing returns null when the base snapshot is null', () => { + assertEquals(resolveEffectivePricing(null, 'fast'), null); + assertEquals(resolveEffectivePricing(null, null), null); +}); diff --git a/packages/protocols/src/messages/index.ts b/packages/protocols/src/messages/index.ts index dc4c02435..034a40722 100644 --- a/packages/protocols/src/messages/index.ts +++ b/packages/protocols/src/messages/index.ts @@ -225,7 +225,18 @@ export interface MessagesUsage { output_tokens: number; cache_creation_input_tokens?: number; cache_read_input_tokens?: number; + // Per-TTL split for cache writes introduced by extended-cache-ttl-2025-04-11. + // Each `ephemeral_*` field is a disjoint subset of `cache_creation_input_tokens` + // (the legacy flat field is the sum of both); upstreams that have not opted + // into the beta omit `cache_creation` entirely and emit only the flat field. + cache_creation?: { + ephemeral_5m_input_tokens?: number; + ephemeral_1h_input_tokens?: number; + }; service_tier?: 'standard' | 'priority' | 'batch'; + // Opus 4.6+ stamps the inference speed on the usage object; `fast` selects + // a per-tier pricing override (see `ModelPricing.tiers`). + speed?: 'standard' | 'fast'; server_tool_use?: MessagesUsageServerToolUse; } @@ -300,6 +311,11 @@ export interface MessagesMessageDeltaEvent { output_tokens: number; cache_creation_input_tokens?: number; cache_read_input_tokens?: number; + cache_creation?: { + ephemeral_5m_input_tokens?: number; + ephemeral_1h_input_tokens?: number; + }; + speed?: 'standard' | 'fast'; server_tool_use?: MessagesUsageServerToolUse; }; } diff --git a/packages/protocols/src/responses/index.ts b/packages/protocols/src/responses/index.ts index 907bd0e42..06643249d 100644 --- a/packages/protocols/src/responses/index.ts +++ b/packages/protocols/src/responses/index.ts @@ -395,6 +395,11 @@ export interface ResponsesResult { // never synthesizes it. incomplete_details: { reason: string } | null; error: { message: string; code: string; type?: string } | null; + // OpenAI stamps the actual processing tier on the response object whenever + // the request set `service_tier`. Wire values: 'auto' | 'default' | + // 'flex' | 'scale' | 'priority' (open-ended string here so a future tier + // round-trips verbatim). https://developers.openai.com/api/reference/resources/responses/methods/create + service_tier?: string | null; usage?: { input_tokens: number; output_tokens: number; From 74021dd388fd5c70a2d9f5dc955631383a3f8506 Mon Sep 17 00:00:00 2001 From: Menci Date: Sat, 20 Jun 2026 00:53:18 +0800 Subject: [PATCH 3/4] feat(codex): price flex/priority service tiers per OpenAI public rates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `tiers.flex` and `tiers.priority` overlays for every priced Codex slug so the dashboard's notional cost reflects which OpenAI service tier the request actually ran on. The gateway already captures `usage.service_tier` into `TokenUsage.tier`; this commit completes the loop by giving the cost compute a per-tier rate row to look up. Tier overrides match OpenAI's public pricing (verified 2026-06-19 against https://platform.openai.com/docs/pricing): gpt-5.5 flex \$2.5/\$0.25/\$15 priority \$12.5/\$1.25/\$75 gpt-5.4 flex \$1.25/\$0.13/\$7.5 priority \$5/\$0.5/\$30 gpt-5.4-mini flex \$0.375/\$0.0375/\$2.25 priority \$1.5/\$0.15/\$9 `codex-auto-review` shares `gpt-5.4`'s pricing including the tier overrides. Codex CLI's `/fast` toggle writes `service_tier: "priority"` on the wire (per openai/codex's `ServiceTier::Fast.request_value()`), so operator-facing rows tagged "fast" cost out at the priority row. Cache-write rate stays unset on these entries — OpenAI charges cache creation at the same rate as input, which `unitPriceForDimension`'s fallback chain already covers. --- apps/web/src/pages/dashboard/usage.vue | 11 ++-- packages/provider-codex/src/models_test.ts | 10 +++- packages/provider-codex/src/pricing.ts | 58 ++++++++++++++++------ 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/apps/web/src/pages/dashboard/usage.vue b/apps/web/src/pages/dashboard/usage.vue index 2e206639f..cdb1593e2 100644 --- a/apps/web/src/pages/dashboard/usage.vue +++ b/apps/web/src/pages/dashboard/usage.vue @@ -113,6 +113,7 @@ type Metric = type Range = DashboardRange; const dim = (r: DisplayUsageRecord, k: BillingDimension): number => r.tokens[k] ?? 0; +const cacheWrite = (r: DisplayUsageRecord): number => dim(r, 'input_cache_write') + dim(r, 'input_cache_write_1h'); const api = useApi(); const auth = useAuthStore(); @@ -189,7 +190,7 @@ const tokenSummary = computed(() => { input += dim(r, 'input'); output += dim(r, 'output'); cacheRead += dim(r, 'input_cache_read'); - cacheCreation += dim(r, 'input_cache_write') + dim(r, 'input_cache_write_1h'); + cacheCreation += cacheWrite(r); inputImage += dim(r, 'input_image'); outputImage += dim(r, 'output_image'); } @@ -239,12 +240,12 @@ const metricValue = (r: DisplayUsageRecord, metric: Metric): number => { switch (metric) { case 'requests': return r.requests; case 'cost': return r.cost; - case 'total': return dim(r, 'input') + dim(r, 'output') + dim(r, 'input_cache_read') + dim(r, 'input_cache_write') + dim(r, 'input_cache_write_1h') + dim(r, 'input_image') + dim(r, 'output_image'); - case 'input': return dim(r, 'input') + dim(r, 'input_cache_read') + dim(r, 'input_cache_write') + dim(r, 'input_cache_write_1h') + dim(r, 'input_image'); + case 'total': return dim(r, 'input') + dim(r, 'output') + dim(r, 'input_cache_read') + cacheWrite(r) + dim(r, 'input_image') + dim(r, 'output_image'); + case 'input': return dim(r, 'input') + dim(r, 'input_cache_read') + cacheWrite(r) + dim(r, 'input_image'); case 'output': return dim(r, 'output') + dim(r, 'output_image'); - case 'prefill': return dim(r, 'input') + dim(r, 'input_cache_write') + dim(r, 'input_cache_write_1h') + dim(r, 'input_image'); + case 'prefill': return dim(r, 'input') + cacheWrite(r) + dim(r, 'input_image'); case 'cached': return dim(r, 'input_cache_read'); - case 'cacheCreation': return dim(r, 'input_cache_write') + dim(r, 'input_cache_write_1h'); + case 'cacheCreation': return cacheWrite(r); case 'cachedRate': case 'cacheHitRate': return 0; diff --git a/packages/provider-codex/src/models_test.ts b/packages/provider-codex/src/models_test.ts index 64e1c4648..d5f78e88f 100644 --- a/packages/provider-codex/src/models_test.ts +++ b/packages/provider-codex/src/models_test.ts @@ -66,7 +66,15 @@ describe('codexRawToUpstreamModel', () => { test('attaches OpenAI-API-rate cost for known slugs and treats codex-auto-review as gpt-5.4', () => { const flagship = codexRawToUpstreamModel({ id: 'gpt-5.4', display_name: 'GPT-5.4', context_window: 272000, max_context_window: 1000000 }); - expect(flagship.cost).toEqual({ input: 2.5, input_cache_read: 0.25, output: 15 }); + expect(flagship.cost).toEqual({ + input: 2.5, + input_cache_read: 0.25, + output: 15, + tiers: { + flex: { input: 1.25, input_cache_read: 0.13, output: 7.5 }, + priority: { input: 5, input_cache_read: 0.5, output: 30 }, + }, + }); const review = codexRawToUpstreamModel({ id: 'codex-auto-review', display_name: 'Codex Auto Review', context_window: 272000, max_context_window: 1000000 }); expect(review.cost).toEqual(flagship.cost); }); diff --git a/packages/provider-codex/src/pricing.ts b/packages/provider-codex/src/pricing.ts index 07df17d8d..d1d0221ca 100644 --- a/packages/provider-codex/src/pricing.ts +++ b/packages/provider-codex/src/pricing.ts @@ -1,14 +1,20 @@ // Per-public-model pricing table for the Codex (ChatGPT subscription) -// provider. Codex itself bills as a flat-fee subscription rather than per-token, -// but the gateway tracks usage cost as if the operator were paying OpenAI's -// public API rates — that lets the dashboard surface "value consumed vs. flat -// fee" so the operator can see whether a subscription is paying off relative -// to direct API spend. Values are USD per million tokens, aligned with -// the `Cost` schema in models.dev: +// provider. Codex itself bills as a flat-fee subscription rather than +// per-token, but the gateway tracks usage cost as if the operator were paying +// OpenAI's public API rates — that lets the dashboard surface "value consumed +// vs. flat fee" so the operator can see whether a subscription is paying off +// relative to direct API spend. Values are USD per million tokens, aligned +// with the `Cost` schema in models.dev: // https://github.com/anomalyco/models.dev/blob/8e6d393c01cb42d41a92f18725eef545e7190efb/packages/core/src/schema.ts // -// Source of truth for OpenAI public API prices the table is derived from: -// https://openai.com/api/pricing/ +// Source of truth for OpenAI public API prices: https://openai.com/api/pricing/. +// +// Per-tier overrides cover the two service tiers Codex CLI exposes: `flex` +// (50% off, slower) is set via `service_tier: "flex"`; `priority` (premium +// for guaranteed latency) is what Codex CLI's `/fast` toggle writes, per +// https://github.com/openai/codex/blob/3a2712ea141a5acb243689f803f26e48ff751771/codex-rs/protocol/src/config_types.rs +// OpenAI reports the actual tier used in `usage.service_tier` and the gateway +// captures it onto `TokenUsage.tier` so cost compute can pick the right row. // // Coverage: every slug surfaced by /codex/models for ChatGPT Plus today // (gpt-5.5, gpt-5.4, gpt-5.4-mini, codex-auto-review). New slugs the upstream @@ -19,20 +25,42 @@ import type { ModelPricing } from '@floway-dev/protocols/common'; -const GPT_5_4_PRICING: ModelPricing = { input: 2.5, input_cache_read: 0.25, output: 15 }; +const GPT_5_4_PRICING: ModelPricing = { + input: 2.5, + input_cache_read: 0.25, + output: 15, + tiers: { + flex: { input: 1.25, input_cache_read: 0.13, output: 7.5 }, + priority: { input: 5, input_cache_read: 0.5, output: 30 }, + }, +}; const CODEX_MODEL_PRICING: readonly (readonly [key: string | RegExp, pricing: ModelPricing])[] = [ - ['gpt-5.5', { input: 5, input_cache_read: 0.5, output: 30 }], + ['gpt-5.5', { + input: 5, + input_cache_read: 0.5, + output: 30, + tiers: { + flex: { input: 2.5, input_cache_read: 0.25, output: 15 }, + priority: { input: 12.5, input_cache_read: 1.25, output: 75 }, + }, + }], ['gpt-5.4', GPT_5_4_PRICING], - ['gpt-5.4-mini', { input: 0.75, input_cache_read: 0.075, output: 4.5 }], + ['gpt-5.4-mini', { + input: 0.75, + input_cache_read: 0.075, + output: 4.5, + tiers: { + flex: { input: 0.375, input_cache_read: 0.0375, output: 2.25 }, + priority: { input: 1.5, input_cache_read: 0.15, output: 9 }, + }, + }], // Internal review model gated under codex_cli_rs's auto-review feature; runs - // on the same compute as gpt-5.4 and is billed identically. + // on the same compute as gpt-5.4 and is billed identically (including tier + // overrides — auto-review honors `service_tier` the same way). ['codex-auto-review', GPT_5_4_PRICING], ]; -// Codex doesn't apply variant suffixes to model ids — the upstream's slug is -// the public id verbatim — so the modelKey persisted in `usage.model_key` -// matches the table key directly. export const pricingForCodexModelKey = (modelKey: string): ModelPricing | null => { for (const [key, pricing] of CODEX_MODEL_PRICING) { if (typeof key === 'string' ? modelKey === key : key.test(modelKey)) { From c634ddcb17c9fe1f1685d5d815613ce415062e4f Mon Sep 17 00:00:00 2001 From: Menci Date: Sat, 20 Jun 2026 01:02:50 +0800 Subject: [PATCH 4/4] fix(codex): recover from concurrent refresh-token rotation races Under burst load, two workers can both observe a stale access token on the same Codex upstream and both attempt a refresh. OpenAI rotates the refresh_token on every successful /oauth/token call, so exactly one racer wins; the other's request is rejected with `invalid_grant` for trying to redeem the rotated-out copy. The previous flow treated every `invalid_grant` as a dead credential and let the caller flip the account to `refresh_failed` - destroying the working credential a sibling had just rotated, and forcing the operator to re-import. On `invalid_grant`, the access-token cache now re-reads upstream state for the same `chatgpt-account-id` slot and compares the refresh_token it tried against what is now stored. If they differ, a sibling rotated and we return their freshly-minted access token (the caller treats it as a normal cache hit and skips the terminal flip). If they match, we re-raise the original error so the data-plane / control-plane caller flips the row as before. The other refresh-terminal codes - `app_session_terminated`, `invalid_refresh_token`, `invalid_client`, `unauthorized_client`, `access_denied` - bypass recovery entirely; none of them are caused by a rotation race. `CodexOAuthSessionTerminatedError` now carries the raw OAuth `error` value as a `code` field alongside the existing `upstreamMessage` so the recovery branch can single out `invalid_grant` from the catch. `REFRESH_TERMINAL_OAUTH_CODES` is broadened to the audit-aligned set (`app_session_terminated`, `invalid_grant`, `invalid_refresh_token`, `invalid_client`, `unauthorized_client`, `access_denied`) - Codex is OpenAI OAuth, so the list matches sub2api's `isNonRetryableRefreshError` verbatim. Sub2api's tryRecoverFromRefreshRace (backend/internal/service/oauth_refresh_api.go:173-193) is the canonical pattern; we apply it to Codex's per-account credential here. The token rotation persistence hook stays awaited; the recovery branch reads from the just-persisted state via the upstream repo and returns the sibling's cached access token directly so no second mint fires from this call site. --- .../provider-codex/src/access-token-cache.ts | 62 +++++++++++++++++-- .../src/access-token-cache_test.ts | 40 ++++++++++++ packages/provider-codex/src/auth/oauth.ts | 57 ++++++++++++----- 3 files changed, 137 insertions(+), 22 deletions(-) diff --git a/packages/provider-codex/src/access-token-cache.ts b/packages/provider-codex/src/access-token-cache.ts index ac792da92..d3b977b07 100644 --- a/packages/provider-codex/src/access-token-cache.ts +++ b/packages/provider-codex/src/access-token-cache.ts @@ -1,4 +1,4 @@ -import { refreshCodexAccessToken } from './auth/oauth.ts'; +import { CodexOAuthSessionTerminatedError, refreshCodexAccessToken } from './auth/oauth.ts'; import { readCodexUpstreamState, type CodexAccessTokenEntry, type CodexUpstreamState } from './state.ts'; import { getProviderRepo, type Fetcher } from '@floway-dev/provider'; @@ -76,14 +76,23 @@ export const invalidateCodexAccessToken = async ( accountId: string, ): Promise => { await persistAccessToken(upstreamId, accountId, null, 'invalidateCodexAccessToken'); }; -// Refresh-token rotation is deliberately not folded in here: the caller's -// CAS hook for refresh_token has to coordinate with terminal-state -// transitions and lives upstream of this function. `mintCodexAccessToken` -// below is the standard implementation of that callback. +// Reads, mints, and persists. The mint callback is responsible for routing +// the rotated refresh_token through the upstream's CAS hook; +// `mintCodexAccessToken` below is the standard implementation. +// +// Refresh-race recovery: when the mint throws `invalid_grant`, the +// refresh_token may either be genuinely revoked, or a sibling worker may +// have raced us, won the rotation, and left our copy stale. The cache +// distinguishes by re-reading state and comparing — see +// `recoverFromRefreshRace` below. Other terminal codes (defined in +// `REFRESH_TERMINAL_OAUTH_CODES`) signal credential death under any race +// scenario and skip recovery. Mirrors sub2api's `tryRecoverFromRefreshRace`: +// https://github.com/Wei-Shaw/sub2api/blob/49e99e9d519a55cbe6d3bd94b810978dd64ce4b8/backend/internal/service/oauth_refresh_api.go#L175 export const ensureCodexAccessToken = async ( upstreamId: string, accountId: string, mint: (refreshToken: string) => Promise, + recoveryAllowed = true, ): Promise => { const fresh = await getProviderRepo().upstreams.getById(upstreamId); if (!fresh) throw new Error(`Codex upstream ${upstreamId} not found`); @@ -93,11 +102,52 @@ export const ensureCodexAccessToken = async ( if (account.accessToken && isAccessTokenFresh(account.accessToken)) { return account.accessToken; } - const minted = await mint(account.refresh_token); + + let minted: CodexAccessTokenEntry; + try { + minted = await mint(account.refresh_token); + } catch (err) { + if (err instanceof CodexOAuthSessionTerminatedError && err.code === 'invalid_grant' && recoveryAllowed) { + const recovered = await recoverFromRefreshRace(upstreamId, accountId, account.refresh_token, mint); + if (recovered) return recovered; + } + throw err; + } await persistAccessToken(upstreamId, accountId, minted, 'ensureCodexAccessToken'); return minted; }; +// Re-read state for the same accountId slot and decide whether the +// `invalid_grant` we just saw came from a sibling rotation (return their +// freshly-minted access token) or from a genuinely dead refresh_token +// (return null so the caller re-raises). When a sibling rotated but no +// cached access token is stored — e.g. a concurrent `invalidateCodexAccessToken` +// cleared it — we re-enter the refresh flow once with the live RT in hand. +// The depth guard (`recoveryAllowed: false` on the re-entrant call) +// suppresses a second recovery attempt — if `invalid_grant` strikes again +// the refresh token really is dead and we want the terminal flip. +const recoverFromRefreshRace = async ( + upstreamId: string, + accountId: string, + usedRefreshToken: string, + mint: (refreshToken: string) => Promise, +): Promise => { + const reread = await getProviderRepo().upstreams.getById(upstreamId); + if (!reread) return null; + const rereadState = readCodexUpstreamState(reread.state); + const rereadAccount = rereadState.accounts.find(a => a.chatgptAccountId === accountId); + if (!rereadAccount) return null; + if (rereadAccount.state !== 'active') return null; + if (rereadAccount.refresh_token === usedRefreshToken) return null; + console.info( + `Codex refresh-race recovered for upstream ${upstreamId} account ${accountId}: sibling rotated, using their access token`, + ); + if (rereadAccount.accessToken && isAccessTokenFresh(rereadAccount.accessToken)) { + return rereadAccount.accessToken; + } + return await ensureCodexAccessToken(upstreamId, accountId, mint, false); +}; + // Mints a fresh access token via /oauth/token and routes the rotated // refresh_token through the caller's CAS hook. Awaiting the rotation // persistence (rather than fire-and-forget) is deliberate: under concurrent diff --git a/packages/provider-codex/src/access-token-cache_test.ts b/packages/provider-codex/src/access-token-cache_test.ts index 4b2431e57..1fb1554a4 100644 --- a/packages/provider-codex/src/access-token-cache_test.ts +++ b/packages/provider-codex/src/access-token-cache_test.ts @@ -7,6 +7,7 @@ import { putCodexAccessToken, type CodexAccessTokenEntry, } from './access-token-cache.ts'; +import { CodexOAuthSessionTerminatedError } from './auth/oauth.ts'; import type { CodexUpstreamState } from './state.ts'; import { initProviderRepo, type UpstreamRecord } from '@floway-dev/provider'; @@ -179,4 +180,43 @@ describe('ensureCodexAccessToken', () => { await expect(ensureCodexAccessToken(upstreamId, accountId, mint)).rejects.toThrow(/oauth boom/); expect(saveStateSpy).not.toHaveBeenCalled(); }); + + test('invalid_grant with a sibling rotation in flight → returns the sibling-minted access token, no persist', async () => { + // Simulate the race: between our pre-mint getById and the upstream + // rejecting our refresh_token, a sibling worker won the rotation and + // CAS-wrote rt_v2 + at_sibling. Re-read on recovery observes the new + // pair scoped to the same accountId; we should return it instead of + // destroying a working credential. + const siblingEntry: CodexAccessTokenEntry = { token: 'at_sibling', expiresAt: farFutureMs, refreshedAt: 'sibling' }; + getByIdSpy.mockImplementationOnce(async () => current).mockImplementationOnce(async () => { + current = makeRecord({ accounts: [{ ...baseAccount, refresh_token: 'rt_v2', accessToken: siblingEntry }] }); + return current; + }); + const mint = vi.fn().mockRejectedValue(new CodexOAuthSessionTerminatedError({ code: 'invalid_grant', message: 'replayed' })); + + const out = await ensureCodexAccessToken(upstreamId, accountId, mint); + expect(out).toEqual(siblingEntry); + expect(mint).toHaveBeenCalledTimes(1); + expect(saveStateSpy).not.toHaveBeenCalled(); + }); + + test('invalid_grant with stored RT unchanged → rethrows for the caller to flip to terminal', async () => { + // Same RT on re-read means no sibling rotated; the refresh_token really + // is dead. The cache surfaces the original error; the data-plane / control- + // plane caller is responsible for the terminal-state flip. + const mint = vi.fn().mockRejectedValue(new CodexOAuthSessionTerminatedError({ code: 'invalid_grant', message: 'revoked' })); + await expect(ensureCodexAccessToken(upstreamId, accountId, mint)).rejects.toBeInstanceOf(CodexOAuthSessionTerminatedError); + expect(mint).toHaveBeenCalledTimes(1); + expect(saveStateSpy).not.toHaveBeenCalled(); + }); + + test('app_session_terminated never attempts race recovery — single getById, original error rethrown', async () => { + // Terminal codes other than invalid_grant signal credential death under + // any race scenario; the cache must not re-read state to second-guess + // them. Assert via the absence of a second getById call. + const mint = vi.fn().mockRejectedValue(new CodexOAuthSessionTerminatedError({ code: 'app_session_terminated', message: 'gone' })); + await expect(ensureCodexAccessToken(upstreamId, accountId, mint)).rejects.toBeInstanceOf(CodexOAuthSessionTerminatedError); + expect(getByIdSpy).toHaveBeenCalledTimes(1); + expect(saveStateSpy).not.toHaveBeenCalled(); + }); }); diff --git a/packages/provider-codex/src/auth/oauth.ts b/packages/provider-codex/src/auth/oauth.ts index 2e5395435..247a844f4 100644 --- a/packages/provider-codex/src/auth/oauth.ts +++ b/packages/provider-codex/src/auth/oauth.ts @@ -17,14 +17,49 @@ export interface CodexOAuthTokens { // Terminal error: refresh_token is dead, operator must re-import. Distinct // from generic OAuth 4xx so callers can react to session-termination -// separately from a transient upstream message. +// separately from a transient upstream message. `code` carries the raw OAuth +// `error` value (`invalid_grant`, `app_session_terminated`, etc.) so the +// refresh-race recovery in the access-token cache can single out +// `invalid_grant` — the only terminal code that might mean "a sibling +// worker just rotated the refresh token, and our copy is stale" — from +// codes that signal genuine credential death under any race scenario. export class CodexOAuthSessionTerminatedError extends Error { - constructor(public readonly upstreamMessage: string) { - super(`Codex OAuth session terminated: ${upstreamMessage}`); + readonly code: string; + readonly upstreamMessage: string; + constructor(args: { code: string; message: string }) { + super(`Codex OAuth session terminated: ${args.message}`); this.name = 'CodexOAuthSessionTerminatedError'; + this.code = args.code; + this.upstreamMessage = args.message; } } +// Terminal codes accepted on the authorization-code exchange. `invalid_grant` +// here typically means the operator pasted a stale or wrong callback URL, +// which is recoverable by restarting the PKCE flow rather than re-importing, +// so it stays out of this set. +const EXCHANGE_TERMINAL_OAUTH_CODES: ReadonlySet = new Set([ + 'app_session_terminated', +]); + +// Terminal codes on the refresh path: every one of these signals a dead +// refresh_token that only operator re-import recovers. Aligned with +// sub2api's `isNonRetryableRefreshError`, which shares the same list across +// OpenAI/Claude/Gemini OAuth — Codex is OpenAI OAuth, so the set carries +// over verbatim: +// https://github.com/Wei-Shaw/sub2api/blob/9b270f11d7444550402cbc68bd13530457efe59e/backend/internal/service/token_refresh_service.go#L429 +// `invalid_grant` is included even though the refresh-race recovery in +// access-token-cache.ts may re-classify it when a sibling rotation is +// detected; from the OAuth wire's perspective it is still a terminal signal. +const REFRESH_TERMINAL_OAUTH_CODES: ReadonlySet = new Set([ + 'app_session_terminated', + 'invalid_grant', + 'invalid_refresh_token', + 'invalid_client', + 'unauthorized_client', + 'access_denied', +]); + const codexTokenRequest = async ( body: URLSearchParams, terminalCodes: ReadonlySet, @@ -65,7 +100,7 @@ const codexTokenRequest = async ( if (message === null && typeof root?.detail === 'string') message = root.detail as string; message ??= rawText.slice(0, 256); if (code && terminalCodes.has(code)) { - throw new CodexOAuthSessionTerminatedError(message); + throw new CodexOAuthSessionTerminatedError({ code, message }); } throw new Error(`Codex OAuth /token returned ${response.status}: ${message}`); } @@ -97,11 +132,7 @@ export const exchangeCodexAuthorizationCode = async (opts: { code: string; codeV redirect_uri: CODEX_REDIRECT_URI, code_verifier: opts.codeVerifier, }); - // Only `app_session_terminated` is terminal here — `invalid_grant` on - // exchange typically means the operator pasted a stale or wrong callback - // URL, which is recoverable by restarting the PKCE flow rather than - // re-importing. - return await codexTokenRequest(body, new Set(['app_session_terminated']), directFetcher); + return await codexTokenRequest(body, EXCHANGE_TERMINAL_OAUTH_CODES, directFetcher); }; // `fetcher` is required because the refresh has an associated upstream @@ -114,11 +145,5 @@ export const refreshCodexAccessToken = async (refreshToken: string, fetcher: Fet client_id: CODEX_CLIENT_ID, scope: CODEX_OAUTH_SCOPE, }); - // OAuth `invalid_grant` on the refresh path is unambiguous — the - // refresh_token has been replayed, revoked, or expired. Same recovery as - // `app_session_terminated`: the operator must re-import a fresh auth.json. - // The error text varies ("Your refresh token has already been used to - // generate a new access token", "Token is no longer valid", etc.); the code - // is the stable signal. - return await codexTokenRequest(body, new Set(['app_session_terminated', 'invalid_grant']), fetcher); + return await codexTokenRequest(body, REFRESH_TERMINAL_OAUTH_CODES, fetcher); };