Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1898,6 +1898,185 @@ describe("ProviderRuntimeIngestion", () => {
expect(checkpoint?.checkpointRef).toBe("provider-diff:evt-turn-diff-updated");
});

it("projects context window updates into normalized thread activities", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

harness.emit({
type: "thread.token-usage.updated",
eventId: asEventId("evt-thread-token-usage-updated"),
provider: "codex",
createdAt: now,
threadId: asThreadId("thread-1"),
payload: {
usage: {
usedTokens: 1075,
totalProcessedTokens: 10_200,
maxTokens: 128_000,
inputTokens: 1000,
cachedInputTokens: 500,
outputTokens: 50,
reasoningOutputTokens: 25,
lastUsedTokens: 1075,
lastInputTokens: 1000,
lastCachedInputTokens: 500,
lastOutputTokens: 50,
lastReasoningOutputTokens: 25,
compactsAutomatically: true,
},
},
});

const thread = await waitForThread(harness.engine, (entry) =>
entry.activities.some(
(activity: ProviderRuntimeTestActivity) => activity.kind === "context-window.updated",
),
);

const usageActivity = thread.activities.find(
(activity: ProviderRuntimeTestActivity) => activity.kind === "context-window.updated",
);
expect(usageActivity).toBeDefined();
expect(usageActivity?.payload).toMatchObject({
usedTokens: 1075,
totalProcessedTokens: 10_200,
maxTokens: 128_000,
inputTokens: 1000,
cachedInputTokens: 500,
outputTokens: 50,
reasoningOutputTokens: 25,
lastUsedTokens: 1075,
compactsAutomatically: true,
});
});

it("projects Codex camelCase token usage payloads into normalized thread activities", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

harness.emit({
type: "thread.token-usage.updated",
eventId: asEventId("evt-thread-token-usage-updated-camel"),
provider: "codex",
createdAt: now,
threadId: asThreadId("thread-1"),
payload: {
usage: {
usedTokens: 126,
totalProcessedTokens: 11_839,
maxTokens: 258_400,
inputTokens: 120,
cachedInputTokens: 0,
outputTokens: 6,
reasoningOutputTokens: 0,
lastUsedTokens: 126,
lastInputTokens: 120,
lastCachedInputTokens: 0,
lastOutputTokens: 6,
lastReasoningOutputTokens: 0,
compactsAutomatically: true,
},
},
});

const thread = await waitForThread(harness.engine, (entry) =>
entry.activities.some(
(activity: ProviderRuntimeTestActivity) => activity.kind === "context-window.updated",
),
);

const usageActivity = thread.activities.find(
(activity: ProviderRuntimeTestActivity) => activity.kind === "context-window.updated",
);
expect(usageActivity?.payload).toMatchObject({
usedTokens: 126,
totalProcessedTokens: 11_839,
maxTokens: 258_400,
inputTokens: 120,
cachedInputTokens: 0,
outputTokens: 6,
reasoningOutputTokens: 0,
lastUsedTokens: 126,
lastInputTokens: 120,
lastOutputTokens: 6,
compactsAutomatically: true,
});
});

it("projects Claude usage snapshots with context window into normalized thread activities", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

harness.emit({
type: "thread.token-usage.updated",
eventId: asEventId("evt-thread-token-usage-updated-claude-window"),
provider: "claudeAgent",
createdAt: now,
threadId: asThreadId("thread-1"),
payload: {
usage: {
usedTokens: 31_251,
lastUsedTokens: 31_251,
maxTokens: 200_000,
toolUses: 25,
durationMs: 43_567,
},
},
raw: {
source: "claude.sdk.message",
method: "claude/result/success",
payload: {},
},
});

const thread = await waitForThread(harness.engine, (entry) =>
entry.activities.some(
(activity: ProviderRuntimeTestActivity) => activity.kind === "context-window.updated",
),
);

const usageActivity = thread.activities.find(
(activity: ProviderRuntimeTestActivity) => activity.kind === "context-window.updated",
);
expect(usageActivity?.payload).toMatchObject({
usedTokens: 31_251,
lastUsedTokens: 31_251,
maxTokens: 200_000,
toolUses: 25,
durationMs: 43_567,
});
});

it("projects compacted thread state into context compaction activities", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

harness.emit({
type: "thread.state.changed",
eventId: asEventId("evt-thread-compacted"),
provider: "codex",
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-1"),
payload: {
state: "compacted",
detail: { source: "provider" },
},
});

const thread = await waitForThread(harness.engine, (entry) =>
entry.activities.some(
(activity: ProviderRuntimeTestActivity) => activity.kind === "context-compaction",
),
);

const activity = thread.activities.find(
(candidate: ProviderRuntimeTestActivity) => candidate.kind === "context-compaction",
);
expect(activity?.summary).toBe("Context compacted");
expect(activity?.tone).toBe("info");
});

it("projects Codex task lifecycle chunks into thread activities", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
Expand Down
52 changes: 52 additions & 0 deletions apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
CheckpointRef,
isToolLifecycleItemType,
ThreadId,
type ThreadTokenUsageSnapshot,
TurnId,
type OrchestrationThreadActivity,
type ProviderRuntimeEvent,
Expand Down Expand Up @@ -101,6 +102,15 @@ function asString(value: unknown): string | undefined {
return typeof value === "string" ? value : undefined;
}

function buildContextWindowActivityPayload(
event: ProviderRuntimeEvent,
): ThreadTokenUsageSnapshot | undefined {
if (event.type !== "thread.token-usage.updated" || event.payload.usage.usedTokens <= 0) {
return undefined;
}
return event.payload.usage;
}

function runtimePayloadRecord(event: ProviderRuntimeEvent): Record<string, unknown> | undefined {
const payload = (event as { payload?: unknown }).payload;
if (!payload || typeof payload !== "object") {
Expand Down Expand Up @@ -409,6 +419,48 @@ function runtimeEventToActivities(
];
}

case "thread.state.changed": {
if (event.payload.state !== "compacted") {
return [];
}

return [
{
id: event.eventId,
createdAt: event.createdAt,
tone: "info",
kind: "context-compaction",
summary: "Context compacted",
payload: {
state: event.payload.state,
...(event.payload.detail !== undefined ? { detail: event.payload.detail } : {}),
},
turnId: toTurnId(event.turnId) ?? null,
...maybeSequence,
},
];
}

case "thread.token-usage.updated": {
const payload = buildContextWindowActivityPayload(event);
if (!payload) {
return [];
}

return [
{
id: event.eventId,
createdAt: event.createdAt,
tone: "info",
kind: "context-window.updated",
summary: "Context window updated",
payload,
turnId: toTurnId(event.turnId) ?? null,
...maybeSequence,
},
];
}

case "item.updated": {
if (!isToolLifecycleItemType(event.payload.itemType)) {
return [];
Expand Down
123 changes: 122 additions & 1 deletion apps/server/src/provider/Layers/ClaudeAdapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ describe("ClaudeAdapterLive", () => {
return Effect.gen(function* () {
const adapter = yield* ClaudeAdapter;

const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 5).pipe(
const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 6).pipe(
Stream.runCollect,
Effect.forkChild,
);
Expand Down Expand Up @@ -1200,6 +1200,127 @@ describe("ClaudeAdapterLive", () => {
);
});

it.effect("emits thread token usage updates from Claude task progress", () => {
const harness = makeHarness();
return Effect.gen(function* () {
const adapter = yield* ClaudeAdapter;

const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 6).pipe(
Stream.runCollect,
Effect.forkChild,
);

yield* adapter.startSession({
threadId: THREAD_ID,
provider: "claudeAgent",
runtimeMode: "full-access",
});

harness.query.emit({
type: "system",
subtype: "task_progress",
task_id: "task-usage-1",
description: "Thinking through the patch",
usage: {
total_tokens: 321,
tool_uses: 2,
duration_ms: 654,
},
session_id: "sdk-session-task-usage",
uuid: "task-usage-progress-1",
} as unknown as SDKMessage);

const runtimeEvents = Array.from(yield* Fiber.join(runtimeEventsFiber));
const usageEvent = runtimeEvents.find((event) => event.type === "thread.token-usage.updated");
const progressEvent = runtimeEvents.find((event) => event.type === "task.progress");
assert.equal(usageEvent?.type, "thread.token-usage.updated");
if (usageEvent?.type === "thread.token-usage.updated") {
assert.deepEqual(usageEvent.payload, {
usage: {
usedTokens: 321,
lastUsedTokens: 321,
toolUses: 2,
durationMs: 654,
},
});
}
assert.equal(progressEvent?.type, "task.progress");
if (usageEvent && progressEvent) {
assert.notStrictEqual(usageEvent.eventId, progressEvent.eventId);
}
}).pipe(
Effect.provideService(Random.Random, makeDeterministicRandomService()),
Effect.provide(harness.layer),
);
});

it.effect("emits Claude context window on result completion usage snapshots", () => {
const harness = makeHarness();
return Effect.gen(function* () {
const adapter = yield* ClaudeAdapter;

const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 7).pipe(
Stream.runCollect,
Effect.forkChild,
);

yield* adapter.startSession({
threadId: THREAD_ID,
provider: "claudeAgent",
runtimeMode: "full-access",
});

yield* adapter.sendTurn({
threadId: THREAD_ID,
input: "hello",
attachments: [],
});

harness.query.emit({
type: "result",
subtype: "success",
is_error: false,
duration_ms: 1234,
duration_api_ms: 1200,
num_turns: 1,
result: "done",
stop_reason: "end_turn",
session_id: "sdk-session-result-usage",
usage: {
input_tokens: 4,
cache_creation_input_tokens: 2715,
cache_read_input_tokens: 21144,
output_tokens: 679,
},
modelUsage: {
"claude-opus-4-6": {
contextWindow: 200000,
maxOutputTokens: 64000,
},
},
} as unknown as SDKMessage);
harness.query.finish();

const runtimeEvents = Array.from(yield* Fiber.join(runtimeEventsFiber));
const usageEvent = runtimeEvents.find((event) => event.type === "thread.token-usage.updated");
assert.equal(usageEvent?.type, "thread.token-usage.updated");
if (usageEvent?.type === "thread.token-usage.updated") {
assert.deepEqual(usageEvent.payload, {
usage: {
usedTokens: 24542,
lastUsedTokens: 24542,
inputTokens: 23863,
outputTokens: 679,
maxTokens: 200000,
},
});
}
}).pipe(
Effect.provideService(Random.Random, makeDeterministicRandomService()),
Effect.provide(harness.layer),
);
});

it.effect(
"emits completion only after turn result when assistant frames arrive before deltas",
() => {
Expand Down
Loading