diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index c1ba48108f..4f7a28c401 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -1898,6 +1898,185 @@ describe("ProviderRuntimeIngestion", () => { expect(checkpoint?.checkpointRef).toBe("provider-diff:evt-turn-diff-updated"); }); + it("projects context window updates into normalized thread activities", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + harness.emit({ + type: "thread.token-usage.updated", + eventId: asEventId("evt-thread-token-usage-updated"), + provider: "codex", + createdAt: now, + threadId: asThreadId("thread-1"), + payload: { + usage: { + usedTokens: 1075, + totalProcessedTokens: 10_200, + maxTokens: 128_000, + inputTokens: 1000, + cachedInputTokens: 500, + outputTokens: 50, + reasoningOutputTokens: 25, + lastUsedTokens: 1075, + lastInputTokens: 1000, + lastCachedInputTokens: 500, + lastOutputTokens: 50, + lastReasoningOutputTokens: 25, + compactsAutomatically: true, + }, + }, + }); + + const thread = await waitForThread(harness.engine, (entry) => + entry.activities.some( + (activity: ProviderRuntimeTestActivity) => activity.kind === "context-window.updated", + ), + ); + + const usageActivity = thread.activities.find( + (activity: ProviderRuntimeTestActivity) => activity.kind === "context-window.updated", + ); + expect(usageActivity).toBeDefined(); + expect(usageActivity?.payload).toMatchObject({ + usedTokens: 1075, + totalProcessedTokens: 10_200, + maxTokens: 128_000, + inputTokens: 1000, + cachedInputTokens: 500, + outputTokens: 50, + reasoningOutputTokens: 25, + lastUsedTokens: 1075, + compactsAutomatically: true, + }); + }); + + it("projects Codex camelCase token usage payloads into normalized thread activities", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + harness.emit({ + type: "thread.token-usage.updated", + eventId: asEventId("evt-thread-token-usage-updated-camel"), + provider: "codex", + createdAt: now, + threadId: asThreadId("thread-1"), + payload: { + usage: { + usedTokens: 126, + totalProcessedTokens: 11_839, + maxTokens: 258_400, + inputTokens: 120, + cachedInputTokens: 0, + outputTokens: 6, + reasoningOutputTokens: 0, + lastUsedTokens: 126, + lastInputTokens: 120, + lastCachedInputTokens: 0, + lastOutputTokens: 6, + lastReasoningOutputTokens: 0, + compactsAutomatically: true, + }, + }, + }); + + const thread = await waitForThread(harness.engine, (entry) => + entry.activities.some( + (activity: ProviderRuntimeTestActivity) => activity.kind === "context-window.updated", + ), + ); + + const usageActivity = thread.activities.find( + (activity: ProviderRuntimeTestActivity) => activity.kind === "context-window.updated", + ); + expect(usageActivity?.payload).toMatchObject({ + usedTokens: 126, + totalProcessedTokens: 11_839, + maxTokens: 258_400, + inputTokens: 120, + cachedInputTokens: 0, + outputTokens: 6, + reasoningOutputTokens: 0, + lastUsedTokens: 126, + lastInputTokens: 120, + lastOutputTokens: 6, + compactsAutomatically: true, + }); + }); + + it("projects Claude usage snapshots with context window into normalized thread activities", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + harness.emit({ + type: "thread.token-usage.updated", + eventId: asEventId("evt-thread-token-usage-updated-claude-window"), + provider: "claudeAgent", + createdAt: now, + threadId: asThreadId("thread-1"), + payload: { + usage: { + usedTokens: 31_251, + lastUsedTokens: 31_251, + maxTokens: 200_000, + toolUses: 25, + durationMs: 43_567, + }, + }, + raw: { + source: "claude.sdk.message", + method: "claude/result/success", + payload: {}, + }, + }); + + const thread = await waitForThread(harness.engine, (entry) => + entry.activities.some( + (activity: ProviderRuntimeTestActivity) => activity.kind === "context-window.updated", + ), + ); + + const usageActivity = thread.activities.find( + (activity: ProviderRuntimeTestActivity) => activity.kind === "context-window.updated", + ); + expect(usageActivity?.payload).toMatchObject({ + usedTokens: 31_251, + lastUsedTokens: 31_251, + maxTokens: 200_000, + toolUses: 25, + durationMs: 43_567, + }); + }); + + it("projects compacted thread state into context compaction activities", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + harness.emit({ + type: "thread.state.changed", + eventId: asEventId("evt-thread-compacted"), + provider: "codex", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-1"), + payload: { + state: "compacted", + detail: { source: "provider" }, + }, + }); + + const thread = await waitForThread(harness.engine, (entry) => + entry.activities.some( + (activity: ProviderRuntimeTestActivity) => activity.kind === "context-compaction", + ), + ); + + const activity = thread.activities.find( + (candidate: ProviderRuntimeTestActivity) => candidate.kind === "context-compaction", + ); + expect(activity?.summary).toBe("Context compacted"); + expect(activity?.tone).toBe("info"); + }); + it("projects Codex task lifecycle chunks into thread activities", async () => { const harness = await createHarness(); const now = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 3df47941af..170b5bc717 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -8,6 +8,7 @@ import { CheckpointRef, isToolLifecycleItemType, ThreadId, + type ThreadTokenUsageSnapshot, TurnId, type OrchestrationThreadActivity, type ProviderRuntimeEvent, @@ -101,6 +102,15 @@ function asString(value: unknown): string | undefined { return typeof value === "string" ? value : undefined; } +function buildContextWindowActivityPayload( + event: ProviderRuntimeEvent, +): ThreadTokenUsageSnapshot | undefined { + if (event.type !== "thread.token-usage.updated" || event.payload.usage.usedTokens <= 0) { + return undefined; + } + return event.payload.usage; +} + function runtimePayloadRecord(event: ProviderRuntimeEvent): Record | undefined { const payload = (event as { payload?: unknown }).payload; if (!payload || typeof payload !== "object") { @@ -409,6 +419,48 @@ function runtimeEventToActivities( ]; } + case "thread.state.changed": { + if (event.payload.state !== "compacted") { + return []; + } + + return [ + { + id: event.eventId, + createdAt: event.createdAt, + tone: "info", + kind: "context-compaction", + summary: "Context compacted", + payload: { + state: event.payload.state, + ...(event.payload.detail !== undefined ? { detail: event.payload.detail } : {}), + }, + turnId: toTurnId(event.turnId) ?? null, + ...maybeSequence, + }, + ]; + } + + case "thread.token-usage.updated": { + const payload = buildContextWindowActivityPayload(event); + if (!payload) { + return []; + } + + return [ + { + id: event.eventId, + createdAt: event.createdAt, + tone: "info", + kind: "context-window.updated", + summary: "Context window updated", + payload, + turnId: toTurnId(event.turnId) ?? null, + ...maybeSequence, + }, + ]; + } + case "item.updated": { if (!isToolLifecycleItemType(event.payload.itemType)) { return []; diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.test.ts b/apps/server/src/provider/Layers/ClaudeAdapter.test.ts index e23148973e..507188803c 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.test.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.test.ts @@ -1158,7 +1158,7 @@ describe("ClaudeAdapterLive", () => { return Effect.gen(function* () { const adapter = yield* ClaudeAdapter; - const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 5).pipe( + const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 6).pipe( Stream.runCollect, Effect.forkChild, ); @@ -1200,6 +1200,127 @@ describe("ClaudeAdapterLive", () => { ); }); + it.effect("emits thread token usage updates from Claude task progress", () => { + const harness = makeHarness(); + return Effect.gen(function* () { + const adapter = yield* ClaudeAdapter; + + const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 6).pipe( + Stream.runCollect, + Effect.forkChild, + ); + + yield* adapter.startSession({ + threadId: THREAD_ID, + provider: "claudeAgent", + runtimeMode: "full-access", + }); + + harness.query.emit({ + type: "system", + subtype: "task_progress", + task_id: "task-usage-1", + description: "Thinking through the patch", + usage: { + total_tokens: 321, + tool_uses: 2, + duration_ms: 654, + }, + session_id: "sdk-session-task-usage", + uuid: "task-usage-progress-1", + } as unknown as SDKMessage); + + const runtimeEvents = Array.from(yield* Fiber.join(runtimeEventsFiber)); + const usageEvent = runtimeEvents.find((event) => event.type === "thread.token-usage.updated"); + const progressEvent = runtimeEvents.find((event) => event.type === "task.progress"); + assert.equal(usageEvent?.type, "thread.token-usage.updated"); + if (usageEvent?.type === "thread.token-usage.updated") { + assert.deepEqual(usageEvent.payload, { + usage: { + usedTokens: 321, + lastUsedTokens: 321, + toolUses: 2, + durationMs: 654, + }, + }); + } + assert.equal(progressEvent?.type, "task.progress"); + if (usageEvent && progressEvent) { + assert.notStrictEqual(usageEvent.eventId, progressEvent.eventId); + } + }).pipe( + Effect.provideService(Random.Random, makeDeterministicRandomService()), + Effect.provide(harness.layer), + ); + }); + + it.effect("emits Claude context window on result completion usage snapshots", () => { + const harness = makeHarness(); + return Effect.gen(function* () { + const adapter = yield* ClaudeAdapter; + + const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 7).pipe( + Stream.runCollect, + Effect.forkChild, + ); + + yield* adapter.startSession({ + threadId: THREAD_ID, + provider: "claudeAgent", + runtimeMode: "full-access", + }); + + yield* adapter.sendTurn({ + threadId: THREAD_ID, + input: "hello", + attachments: [], + }); + + harness.query.emit({ + type: "result", + subtype: "success", + is_error: false, + duration_ms: 1234, + duration_api_ms: 1200, + num_turns: 1, + result: "done", + stop_reason: "end_turn", + session_id: "sdk-session-result-usage", + usage: { + input_tokens: 4, + cache_creation_input_tokens: 2715, + cache_read_input_tokens: 21144, + output_tokens: 679, + }, + modelUsage: { + "claude-opus-4-6": { + contextWindow: 200000, + maxOutputTokens: 64000, + }, + }, + } as unknown as SDKMessage); + harness.query.finish(); + + const runtimeEvents = Array.from(yield* Fiber.join(runtimeEventsFiber)); + const usageEvent = runtimeEvents.find((event) => event.type === "thread.token-usage.updated"); + assert.equal(usageEvent?.type, "thread.token-usage.updated"); + if (usageEvent?.type === "thread.token-usage.updated") { + assert.deepEqual(usageEvent.payload, { + usage: { + usedTokens: 24542, + lastUsedTokens: 24542, + inputTokens: 23863, + outputTokens: 679, + maxTokens: 200000, + }, + }); + } + }).pipe( + Effect.provideService(Random.Random, makeDeterministicRandomService()), + Effect.provide(harness.layer), + ); + }); + it.effect( "emits completion only after turn result when assistant frames arrive before deltas", () => { diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.ts b/apps/server/src/provider/Layers/ClaudeAdapter.ts index acbee86053..37013682a9 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.ts @@ -29,6 +29,7 @@ import { type ProviderRuntimeTurnStatus, type ProviderSendTurnInput, type ProviderSession, + type ThreadTokenUsageSnapshot, type ProviderUserInputAnswers, type RuntimeContentStreamKind, RuntimeItemId, @@ -156,6 +157,7 @@ interface ClaudeSessionContext { }>; readonly inFlightTools: Map; turnState: ClaudeTurnState | undefined; + lastKnownContextWindow: number | undefined; lastAssistantUuid: string | undefined; lastThreadStartedId: string | undefined; stopped: boolean; @@ -259,6 +261,82 @@ function asRuntimeItemId(value: string): RuntimeItemId { return RuntimeItemId.makeUnsafe(value); } +function maxClaudeContextWindowFromModelUsage(modelUsage: unknown): number | undefined { + if (!modelUsage || typeof modelUsage !== "object") { + return undefined; + } + + let maxContextWindow: number | undefined; + for (const value of Object.values(modelUsage as Record)) { + if (!value || typeof value !== "object") { + continue; + } + const contextWindow = (value as { contextWindow?: unknown }).contextWindow; + if ( + typeof contextWindow !== "number" || + !Number.isFinite(contextWindow) || + contextWindow <= 0 + ) { + continue; + } + maxContextWindow = Math.max(maxContextWindow ?? 0, contextWindow); + } + + return maxContextWindow; +} + +function normalizeClaudeTokenUsage( + usage: unknown, + contextWindow?: number, +): ThreadTokenUsageSnapshot | undefined { + if (!usage || typeof usage !== "object") { + return undefined; + } + + const record = usage as Record; + const directUsedTokens = + typeof record.total_tokens === "number" && Number.isFinite(record.total_tokens) + ? record.total_tokens + : undefined; + const inputTokens = + (typeof record.input_tokens === "number" && Number.isFinite(record.input_tokens) + ? record.input_tokens + : 0) + + (typeof record.cache_creation_input_tokens === "number" && + Number.isFinite(record.cache_creation_input_tokens) + ? record.cache_creation_input_tokens + : 0) + + (typeof record.cache_read_input_tokens === "number" && + Number.isFinite(record.cache_read_input_tokens) + ? record.cache_read_input_tokens + : 0); + const outputTokens = + typeof record.output_tokens === "number" && Number.isFinite(record.output_tokens) + ? record.output_tokens + : 0; + const derivedUsedTokens = inputTokens + outputTokens; + const usedTokens = directUsedTokens ?? (derivedUsedTokens > 0 ? derivedUsedTokens : undefined); + if (usedTokens === undefined || usedTokens <= 0) { + return undefined; + } + + return { + usedTokens, + lastUsedTokens: usedTokens, + ...(inputTokens > 0 ? { inputTokens } : {}), + ...(outputTokens > 0 ? { outputTokens } : {}), + ...(typeof contextWindow === "number" && Number.isFinite(contextWindow) && contextWindow > 0 + ? { maxTokens: contextWindow } + : {}), + ...(typeof record.tool_uses === "number" && Number.isFinite(record.tool_uses) + ? { toolUses: record.tool_uses } + : {}), + ...(typeof record.duration_ms === "number" && Number.isFinite(record.duration_ms) + ? { durationMs: record.duration_ms } + : {}), + }; +} + function asCanonicalTurnId(value: TurnId): TurnId { return value; } @@ -1287,8 +1365,34 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { result?: SDKResultMessage, ): Effect.Effect => Effect.gen(function* () { + const resultUsage = + result?.usage && typeof result.usage === "object" ? { ...result.usage } : undefined; + const resultContextWindow = maxClaudeContextWindowFromModelUsage(result?.modelUsage); + const usageSnapshot = normalizeClaudeTokenUsage( + resultUsage, + resultContextWindow ?? context.lastKnownContextWindow, + ); + if (resultContextWindow !== undefined) { + context.lastKnownContextWindow = resultContextWindow; + } + const turnState = context.turnState; if (!turnState) { + if (usageSnapshot) { + const usageStamp = yield* makeEventStamp(); + yield* offerRuntimeEvent({ + type: "thread.token-usage.updated", + eventId: usageStamp.eventId, + provider: PROVIDER, + createdAt: usageStamp.createdAt, + threadId: context.session.threadId, + payload: { + usage: usageSnapshot, + }, + providerRefs: {}, + }); + } + const stamp = yield* makeEventStamp(); yield* offerRuntimeEvent({ type: "turn.completed", @@ -1356,6 +1460,22 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { items: [...turnState.items], }); + if (usageSnapshot) { + const usageStamp = yield* makeEventStamp(); + yield* offerRuntimeEvent({ + type: "thread.token-usage.updated", + eventId: usageStamp.eventId, + provider: PROVIDER, + createdAt: usageStamp.createdAt, + threadId: context.session.threadId, + turnId: turnState.turnId, + payload: { + usage: usageSnapshot, + }, + providerRefs: nativeProviderRefs(context), + }); + } + const stamp = yield* makeEventStamp(); yield* offerRuntimeEvent({ type: "turn.completed", @@ -1925,6 +2045,24 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { }); return; case "task_progress": + if (message.usage) { + const normalizedUsage = normalizeClaudeTokenUsage( + message.usage, + context.lastKnownContextWindow, + ); + if (normalizedUsage) { + const usageStamp = yield* makeEventStamp(); + yield* offerRuntimeEvent({ + ...base, + eventId: usageStamp.eventId, + createdAt: usageStamp.createdAt, + type: "thread.token-usage.updated", + payload: { + usage: normalizedUsage, + }, + }); + } + } yield* offerRuntimeEvent({ ...base, type: "task.progress", @@ -1938,6 +2076,24 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { }); return; case "task_notification": + if (message.usage) { + const normalizedUsage = normalizeClaudeTokenUsage( + message.usage, + context.lastKnownContextWindow, + ); + if (normalizedUsage) { + const usageStamp = yield* makeEventStamp(); + yield* offerRuntimeEvent({ + ...base, + eventId: usageStamp.eventId, + createdAt: usageStamp.createdAt, + type: "thread.token-usage.updated", + payload: { + usage: normalizedUsage, + }, + }); + } + } yield* offerRuntimeEvent({ ...base, type: "task.completed", @@ -2634,6 +2790,7 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { turns: [], inFlightTools, turnState: undefined, + lastKnownContextWindow: undefined, lastAssistantUuid: resumeState?.resumeSessionAt, lastThreadStartedId: undefined, stopped: false, diff --git a/apps/server/src/provider/Layers/CodexAdapter.test.ts b/apps/server/src/provider/Layers/CodexAdapter.test.ts index 31d394c3ec..14c7c6dd42 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.test.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.test.ts @@ -839,6 +839,70 @@ lifecycleLayer("CodexAdapterLive lifecycle", (it) => { assert.equal(firstEvent.value.payload.taskId, "turn-child"); }), ); + + it.effect("unwraps Codex token usage payloads for context window events", () => + Effect.gen(function* () { + const adapter = yield* CodexAdapter; + const firstEventFiber = yield* Stream.runHead(adapter.streamEvents).pipe(Effect.forkChild); + + lifecycleManager.emit("event", { + id: asEventId("evt-codex-thread-token-usage-updated"), + kind: "notification", + provider: "codex", + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-1"), + createdAt: new Date().toISOString(), + method: "thread/tokenUsage/updated", + payload: { + threadId: "thread-1", + turnId: "turn-1", + tokenUsage: { + total: { + inputTokens: 11_833, + cachedInputTokens: 3456, + outputTokens: 6, + reasoningOutputTokens: 0, + totalTokens: 11_839, + }, + last: { + inputTokens: 120, + cachedInputTokens: 0, + outputTokens: 6, + reasoningOutputTokens: 0, + totalTokens: 126, + }, + modelContextWindow: 258_400, + }, + }, + } satisfies ProviderEvent); + + const firstEvent = yield* Fiber.join(firstEventFiber); + assert.equal(firstEvent._tag, "Some"); + if (firstEvent._tag !== "Some") { + return; + } + assert.equal(firstEvent.value.type, "thread.token-usage.updated"); + if (firstEvent.value.type !== "thread.token-usage.updated") { + return; + } + + assert.deepEqual(firstEvent.value.payload.usage, { + usedTokens: 126, + totalProcessedTokens: 11_839, + maxTokens: 258_400, + inputTokens: 120, + cachedInputTokens: 0, + outputTokens: 6, + reasoningOutputTokens: 0, + lastUsedTokens: 126, + lastInputTokens: 120, + lastCachedInputTokens: 0, + lastOutputTokens: 6, + lastReasoningOutputTokens: 0, + compactsAutomatically: true, + }); + }), + ); }); afterAll(() => { diff --git a/apps/server/src/provider/Layers/CodexAdapter.ts b/apps/server/src/provider/Layers/CodexAdapter.ts index e2fcebe1bc..d34505582e 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.ts @@ -11,6 +11,7 @@ import { type CanonicalRequestType, type ProviderEvent, type ProviderRuntimeEvent, + type ThreadTokenUsageSnapshot, type ProviderUserInputAnswers, RuntimeItemId, RuntimeRequestId, @@ -109,6 +110,48 @@ function asNumber(value: unknown): number | undefined { return typeof value === "number" && Number.isFinite(value) ? value : undefined; } +function normalizeCodexTokenUsage(value: unknown): ThreadTokenUsageSnapshot | undefined { + const usage = asObject(value); + const totalUsage = asObject(usage?.total_token_usage ?? usage?.total); + const lastUsage = asObject(usage?.last_token_usage ?? usage?.last); + + const totalProcessedTokens = + asNumber(totalUsage?.total_tokens) ?? asNumber(totalUsage?.totalTokens); + const usedTokens = + asNumber(lastUsage?.total_tokens) ?? asNumber(lastUsage?.totalTokens) ?? totalProcessedTokens; + if (usedTokens === undefined || usedTokens <= 0) { + return undefined; + } + + const maxTokens = asNumber(usage?.model_context_window) ?? asNumber(usage?.modelContextWindow); + const inputTokens = asNumber(lastUsage?.input_tokens) ?? asNumber(lastUsage?.inputTokens); + const cachedInputTokens = + asNumber(lastUsage?.cached_input_tokens) ?? asNumber(lastUsage?.cachedInputTokens); + const outputTokens = asNumber(lastUsage?.output_tokens) ?? asNumber(lastUsage?.outputTokens); + const reasoningOutputTokens = + asNumber(lastUsage?.reasoning_output_tokens) ?? asNumber(lastUsage?.reasoningOutputTokens); + + return { + usedTokens, + ...(totalProcessedTokens !== undefined && totalProcessedTokens > usedTokens + ? { totalProcessedTokens } + : {}), + ...(maxTokens !== undefined ? { maxTokens } : {}), + ...(inputTokens !== undefined ? { inputTokens } : {}), + ...(cachedInputTokens !== undefined ? { cachedInputTokens } : {}), + ...(outputTokens !== undefined ? { outputTokens } : {}), + ...(reasoningOutputTokens !== undefined ? { reasoningOutputTokens } : {}), + ...(usedTokens !== undefined ? { lastUsedTokens: usedTokens } : {}), + ...(inputTokens !== undefined ? { lastInputTokens: inputTokens } : {}), + ...(cachedInputTokens !== undefined ? { lastCachedInputTokens: cachedInputTokens } : {}), + ...(outputTokens !== undefined ? { lastOutputTokens: outputTokens } : {}), + ...(reasoningOutputTokens !== undefined + ? { lastReasoningOutputTokens: reasoningOutputTokens } + : {}), + compactsAutomatically: true, + }; +} + function toTurnId(value: string | undefined): TurnId | undefined { return value?.trim() ? TurnId.makeUnsafe(value) : undefined; } @@ -708,12 +751,17 @@ function mapToRuntimeEvents( } if (event.method === "thread/tokenUsage/updated") { + const tokenUsage = asObject(payload?.tokenUsage); + const normalizedUsage = normalizeCodexTokenUsage(tokenUsage ?? event.payload); + if (!normalizedUsage) { + return []; + } return [ { type: "thread.token-usage.updated", ...runtimeEventBase(event, canonicalThreadId), payload: { - usage: event.payload ?? {}, + usage: normalizedUsage, }, }, ]; diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index e628f6ea6a..690694f539 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -141,12 +141,14 @@ import { type TerminalContextDraft, type TerminalContextSelection, } from "../lib/terminalContext"; +import { deriveLatestContextWindowSnapshot } from "../lib/contextWindow"; import { shouldUseCompactComposerFooter } from "./composerFooterLayout"; import { selectThreadTerminalState, useTerminalStateStore } from "../terminalStateStore"; import { ComposerPromptEditor, type ComposerPromptEditorHandle } from "./ComposerPromptEditor"; import { PullRequestThreadDialog } from "./PullRequestThreadDialog"; import { MessagesTimeline } from "./chat/MessagesTimeline"; import { ChatHeader } from "./chat/ChatHeader"; +import { ContextWindowMeter } from "./chat/ContextWindowMeter"; import { buildExpandedImagePreview, ExpandedImagePreview } from "./chat/ExpandedImagePreview"; import { AVAILABLE_PROVIDER_OPTIONS, ProviderModelPicker } from "./chat/ProviderModelPicker"; import { ComposerCommandItem, ComposerCommandMenu } from "./chat/ComposerCommandMenu"; @@ -480,6 +482,10 @@ export default function ChatView({ threadId }: ChatViewProps) { const diffOpen = rawSearch.diff === "1"; const activeThreadId = activeThread?.id ?? null; const activeLatestTurn = activeThread?.latestTurn ?? null; + const activeContextWindow = useMemo( + () => deriveLatestContextWindowSnapshot(activeThread?.activities ?? []), + [activeThread?.activities], + ); const latestTurnSettled = isLatestTurnSettled(activeLatestTurn, activeThread?.session ?? null); const activeProject = projects.find((p) => p.id === activeThread?.projectId); @@ -3894,6 +3900,9 @@ export default function ChatView({ threadId }: ChatViewProps) { data-chat-composer-actions="right" className="flex shrink-0 items-center gap-2" > + {activeContextWindow ? ( + + ) : null} {isPreparingWorktree ? ( Preparing worktree... diff --git a/apps/web/src/components/chat/ContextWindowMeter.tsx b/apps/web/src/components/chat/ContextWindowMeter.tsx new file mode 100644 index 0000000000..280c262cf2 --- /dev/null +++ b/apps/web/src/components/chat/ContextWindowMeter.tsx @@ -0,0 +1,114 @@ +import { cn } from "~/lib/utils"; +import { type ContextWindowSnapshot, formatContextWindowTokens } from "~/lib/contextWindow"; +import { Popover, PopoverPopup, PopoverTrigger } from "../ui/popover"; + +function formatPercentage(value: number | null): string | null { + if (value === null || !Number.isFinite(value)) { + return null; + } + if (value < 10) { + return `${value.toFixed(1).replace(/\.0$/, "")}%`; + } + return `${Math.round(value)}%`; +} + +export function ContextWindowMeter(props: { usage: ContextWindowSnapshot }) { + const { usage } = props; + const usedPercentage = formatPercentage(usage.usedPercentage); + const normalizedPercentage = Math.max(0, Math.min(100, usage.usedPercentage ?? 0)); + const radius = 9.75; + const circumference = 2 * Math.PI * radius; + const dashOffset = circumference - (normalizedPercentage / 100) * circumference; + + return ( + + + + + + {usage.usedPercentage !== null + ? Math.round(usage.usedPercentage) + : formatContextWindowTokens(usage.usedTokens)} + + + + } + /> + +
+
+ Context window +
+ {usage.maxTokens !== null && usedPercentage ? ( +
+ {usedPercentage} + + {formatContextWindowTokens(usage.usedTokens)} + / + {formatContextWindowTokens(usage.maxTokens ?? null)} context used +
+ ) : ( +
+ {formatContextWindowTokens(usage.usedTokens)} tokens used so far +
+ )} + {(usage.totalProcessedTokens ?? null) !== null && + (usage.totalProcessedTokens ?? 0) > usage.usedTokens ? ( +
+ Total processed: {formatContextWindowTokens(usage.totalProcessedTokens ?? null)}{" "} + tokens +
+ ) : null} + {usage.compactsAutomatically ? ( +
+ Automatically compacts its context when needed. +
+ ) : null} +
+
+
+ ); +} diff --git a/apps/web/src/components/chat/MessagesTimeline.test.tsx b/apps/web/src/components/chat/MessagesTimeline.test.tsx index e694faa0f2..692438c74a 100644 --- a/apps/web/src/components/chat/MessagesTimeline.test.tsx +++ b/apps/web/src/components/chat/MessagesTimeline.test.tsx @@ -96,4 +96,48 @@ describe("MessagesTimeline", () => { expect(markup).toContain("lucide-terminal"); expect(markup).toContain("yoo what's "); }); + + it("renders context compaction entries in the normal work log", async () => { + const { MessagesTimeline } = await import("./MessagesTimeline"); + const markup = renderToStaticMarkup( + {}} + onOpenTurnDiff={() => {}} + revertTurnCountByUserMessageId={new Map()} + onRevertUserMessage={() => {}} + isRevertingCheckpoint={false} + onImageExpand={() => {}} + markdownCwd={undefined} + resolvedTheme="light" + timestampFormat="locale" + workspaceRoot={undefined} + />, + ); + + expect(markup).toContain("Context compacted"); + expect(markup).toContain("Work log"); + }); }); diff --git a/apps/web/src/lib/contextWindow.test.ts b/apps/web/src/lib/contextWindow.test.ts new file mode 100644 index 0000000000..2173c18aa7 --- /dev/null +++ b/apps/web/src/lib/contextWindow.test.ts @@ -0,0 +1,67 @@ +import { describe, expect, it } from "vitest"; +import { EventId, type OrchestrationThreadActivity, TurnId } from "@t3tools/contracts"; + +import { deriveLatestContextWindowSnapshot, formatContextWindowTokens } from "./contextWindow"; + +function makeActivity(id: string, kind: string, payload: unknown): OrchestrationThreadActivity { + return { + id: EventId.makeUnsafe(id), + tone: "info", + kind, + summary: kind, + payload, + turnId: TurnId.makeUnsafe("turn-1"), + createdAt: "2026-03-23T00:00:00.000Z", + }; +} + +describe("contextWindow", () => { + it("derives the latest valid context window snapshot", () => { + const snapshot = deriveLatestContextWindowSnapshot([ + makeActivity("activity-1", "context-window.updated", { + usedTokens: 1000, + }), + makeActivity("activity-2", "tool.started", {}), + makeActivity("activity-3", "context-window.updated", { + usedTokens: 14_000, + maxTokens: 258_000, + compactsAutomatically: true, + }), + ]); + + expect(snapshot).not.toBeNull(); + expect(snapshot?.usedTokens).toBe(14_000); + expect(snapshot?.totalProcessedTokens).toBeNull(); + expect(snapshot?.maxTokens).toBe(258_000); + expect(snapshot?.compactsAutomatically).toBe(true); + }); + + it("ignores malformed payloads", () => { + const snapshot = deriveLatestContextWindowSnapshot([ + makeActivity("activity-1", "context-window.updated", {}), + ]); + + expect(snapshot).toBeNull(); + }); + + it("formats compact token counts", () => { + expect(formatContextWindowTokens(999)).toBe("999"); + expect(formatContextWindowTokens(1400)).toBe("1.4k"); + expect(formatContextWindowTokens(14_000)).toBe("14k"); + expect(formatContextWindowTokens(258_000)).toBe("258k"); + }); + + it("includes total processed tokens when available", () => { + const snapshot = deriveLatestContextWindowSnapshot([ + makeActivity("activity-1", "context-window.updated", { + usedTokens: 81_659, + totalProcessedTokens: 748_126, + maxTokens: 258_400, + lastUsedTokens: 81_659, + }), + ]); + + expect(snapshot?.usedTokens).toBe(81_659); + expect(snapshot?.totalProcessedTokens).toBe(748_126); + }); +}); diff --git a/apps/web/src/lib/contextWindow.ts b/apps/web/src/lib/contextWindow.ts new file mode 100644 index 0000000000..f668135a13 --- /dev/null +++ b/apps/web/src/lib/contextWindow.ts @@ -0,0 +1,90 @@ +import type { OrchestrationThreadActivity, ThreadTokenUsageSnapshot } from "@t3tools/contracts"; + +function asRecord(value: unknown): Record | null { + return value && typeof value === "object" ? (value as Record) : null; +} + +function asFiniteNumber(value: unknown): number | null { + return typeof value === "number" && Number.isFinite(value) ? value : null; +} + +function asBoolean(value: unknown): boolean | null { + return typeof value === "boolean" ? value : null; +} + +type NullableContextWindowUsage = { + readonly [Key in keyof ThreadTokenUsageSnapshot]: undefined extends ThreadTokenUsageSnapshot[Key] + ? Exclude | null + : ThreadTokenUsageSnapshot[Key]; +}; + +export type ContextWindowSnapshot = NullableContextWindowUsage & { + readonly remainingTokens: number | null; + readonly usedPercentage: number | null; + readonly remainingPercentage: number | null; + readonly updatedAt: string; +}; + +export function deriveLatestContextWindowSnapshot( + activities: ReadonlyArray, +): ContextWindowSnapshot | null { + for (let index = activities.length - 1; index >= 0; index -= 1) { + const activity = activities[index]; + if (!activity || activity.kind !== "context-window.updated") { + continue; + } + + const payload = asRecord(activity.payload); + const usedTokens = asFiniteNumber(payload?.usedTokens); + if (usedTokens === null || usedTokens <= 0) { + continue; + } + + const maxTokens = asFiniteNumber(payload?.maxTokens); + const usedPercentage = + maxTokens !== null && maxTokens > 0 ? Math.min(100, (usedTokens / maxTokens) * 100) : null; + const remainingTokens = + maxTokens !== null ? Math.max(0, Math.round(maxTokens - usedTokens)) : null; + const remainingPercentage = usedPercentage !== null ? Math.max(0, 100 - usedPercentage) : null; + + return { + usedTokens, + totalProcessedTokens: asFiniteNumber(payload?.totalProcessedTokens), + maxTokens, + remainingTokens, + usedPercentage, + remainingPercentage, + inputTokens: asFiniteNumber(payload?.inputTokens), + cachedInputTokens: asFiniteNumber(payload?.cachedInputTokens), + outputTokens: asFiniteNumber(payload?.outputTokens), + reasoningOutputTokens: asFiniteNumber(payload?.reasoningOutputTokens), + lastUsedTokens: asFiniteNumber(payload?.lastUsedTokens), + lastInputTokens: asFiniteNumber(payload?.lastInputTokens), + lastCachedInputTokens: asFiniteNumber(payload?.lastCachedInputTokens), + lastOutputTokens: asFiniteNumber(payload?.lastOutputTokens), + lastReasoningOutputTokens: asFiniteNumber(payload?.lastReasoningOutputTokens), + toolUses: asFiniteNumber(payload?.toolUses), + durationMs: asFiniteNumber(payload?.durationMs), + compactsAutomatically: asBoolean(payload?.compactsAutomatically) ?? false, + updatedAt: activity.createdAt, + }; + } + + return null; +} + +export function formatContextWindowTokens(value: number | null): string { + if (value === null || !Number.isFinite(value)) { + return "0"; + } + if (value < 1_000) { + return `${Math.round(value)}`; + } + if (value < 10_000) { + return `${(value / 1_000).toFixed(1).replace(/\.0$/, "")}k`; + } + if (value < 1_000_000) { + return `${Math.round(value / 1_000)}k`; + } + return `${(value / 1_000_000).toFixed(1).replace(/\.0$/, "")}m`; +} diff --git a/apps/web/src/session-logic.test.ts b/apps/web/src/session-logic.test.ts index 4a113adebe..c786ffc72b 100644 --- a/apps/web/src/session-logic.test.ts +++ b/apps/web/src/session-logic.test.ts @@ -966,6 +966,51 @@ describe("deriveTimelineEntries", () => { }); }); +describe("deriveWorkLogEntries context window handling", () => { + it("excludes context window updates from the work log", () => { + const entries = deriveWorkLogEntries( + [ + makeActivity({ + id: "context-1", + turnId: "turn-1", + kind: "context-window.updated", + summary: "Context window updated", + tone: "info", + }), + makeActivity({ + id: "tool-1", + turnId: "turn-1", + kind: "tool.completed", + summary: "Ran command", + tone: "tool", + }), + ], + TurnId.makeUnsafe("turn-1"), + ); + + expect(entries).toHaveLength(1); + expect(entries[0]?.label).toBe("Ran command"); + }); + + it("keeps context compaction activities as normal work log entries", () => { + const entries = deriveWorkLogEntries( + [ + makeActivity({ + id: "compaction-1", + turnId: "turn-1", + kind: "context-compaction", + summary: "Context compacted", + tone: "info", + }), + ], + TurnId.makeUnsafe("turn-1"), + ); + + expect(entries).toHaveLength(1); + expect(entries[0]?.label).toBe("Context compacted"); + }); +}); + describe("hasToolActivityForTurn", () => { it("returns false when turn id is missing", () => { const activities: OrchestrationThreadActivity[] = [ diff --git a/apps/web/src/session-logic.ts b/apps/web/src/session-logic.ts index 7c3ea96e65..83a95d6313 100644 --- a/apps/web/src/session-logic.ts +++ b/apps/web/src/session-logic.ts @@ -463,6 +463,7 @@ export function deriveWorkLogEntries( .filter((activity) => (latestTurnId ? activity.turnId === latestTurnId : true)) .filter((activity) => activity.kind !== "tool.started") .filter((activity) => activity.kind !== "task.started" && activity.kind !== "task.completed") + .filter((activity) => activity.kind !== "context-window.updated") .filter((activity) => activity.summary !== "Checkpoint captured") .filter((activity) => !isPlanBoundaryToolActivity(activity)) .map(toDerivedWorkLogEntry); diff --git a/packages/contracts/src/providerRuntime.test.ts b/packages/contracts/src/providerRuntime.test.ts index 3dc0091add..9d9c395c3d 100644 --- a/packages/contracts/src/providerRuntime.test.ts +++ b/packages/contracts/src/providerRuntime.test.ts @@ -139,4 +139,29 @@ describe("ProviderRuntimeEvent", () => { }), ).toThrow(); }); + + it("decodes normalized thread token usage snapshots", () => { + const parsed = decodeRuntimeEvent({ + type: "thread.token-usage.updated", + eventId: "event-token-usage-1", + provider: "claudeAgent", + createdAt: "2026-02-28T00:00:04.000Z", + threadId: "thread-1", + payload: { + usage: { + usedTokens: 31251, + maxTokens: 200000, + toolUses: 25, + durationMs: 43567, + }, + }, + }); + + expect(parsed.type).toBe("thread.token-usage.updated"); + if (parsed.type !== "thread.token-usage.updated") { + throw new Error("expected thread.token-usage.updated"); + } + expect(parsed.payload.usage.maxTokens).toBe(200000); + expect(parsed.payload.usage.usedTokens).toBe(31251); + }); }); diff --git a/packages/contracts/src/providerRuntime.ts b/packages/contracts/src/providerRuntime.ts index 2fec889b6d..81231d88f6 100644 --- a/packages/contracts/src/providerRuntime.ts +++ b/packages/contracts/src/providerRuntime.ts @@ -2,7 +2,9 @@ import { Option, Schema } from "effect"; import { EventId, IsoDateTime, + NonNegativeInt, ProviderItemId, + PositiveInt, RuntimeItemId, RuntimeRequestId, RuntimeTaskId, @@ -293,8 +295,27 @@ const ThreadMetadataUpdatedPayload = Schema.Struct({ }); export type ThreadMetadataUpdatedPayload = typeof ThreadMetadataUpdatedPayload.Type; +export const ThreadTokenUsageSnapshot = Schema.Struct({ + usedTokens: NonNegativeInt, + totalProcessedTokens: Schema.optional(NonNegativeInt), + maxTokens: Schema.optional(PositiveInt), + inputTokens: Schema.optional(NonNegativeInt), + cachedInputTokens: Schema.optional(NonNegativeInt), + outputTokens: Schema.optional(NonNegativeInt), + reasoningOutputTokens: Schema.optional(NonNegativeInt), + lastUsedTokens: Schema.optional(NonNegativeInt), + lastInputTokens: Schema.optional(NonNegativeInt), + lastCachedInputTokens: Schema.optional(NonNegativeInt), + lastOutputTokens: Schema.optional(NonNegativeInt), + lastReasoningOutputTokens: Schema.optional(NonNegativeInt), + toolUses: Schema.optional(NonNegativeInt), + durationMs: Schema.optional(NonNegativeInt), + compactsAutomatically: Schema.optional(Schema.Boolean), +}); +export type ThreadTokenUsageSnapshot = typeof ThreadTokenUsageSnapshot.Type; + const ThreadTokenUsageUpdatedPayload = Schema.Struct({ - usage: Schema.Unknown, + usage: ThreadTokenUsageSnapshot, }); export type ThreadTokenUsageUpdatedPayload = typeof ThreadTokenUsageUpdatedPayload.Type;