Skip to content

Commit 5bd90e7

Browse files
fix: restore compression timing durability
1 parent 3fc0bba commit 5bd90e7

8 files changed

Lines changed: 128 additions & 28 deletions

File tree

lib/compress/message.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { appendProtectedTools } from "./protected-content"
88
import {
99
allocateBlockId,
1010
consumeCompressionDuration,
11+
getCompressionMessageKey,
1112
allocateRunId,
1213
applyCompressionState,
1314
wrapCompressedSummary,
@@ -53,7 +54,6 @@ export function createCompressMessageTool(ctx: ToolContext): ReturnType<typeof t
5354
typeof (toolCtx as unknown as { callID?: unknown }).callID === "string"
5455
? (toolCtx as unknown as { callID: string }).callID
5556
: undefined
56-
const durationMs = consumeCompressionDuration(ctx.state, callId)
5757

5858
const { rawMessages, searchContext } = await prepareSession(
5959
ctx,
@@ -96,6 +96,11 @@ export function createCompressMessageTool(ctx: ToolContext): ReturnType<typeof t
9696
})
9797
}
9898

99+
const durationMs = consumeCompressionDuration(
100+
ctx.state,
101+
getCompressionMessageKey(toolCtx.sessionID, toolCtx.messageID),
102+
callId,
103+
)
99104
const runId = allocateRunId(ctx.state)
100105

101106
for (const { plan, summaryWithTools } of preparedPlans) {

lib/compress/range.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
COMPRESSED_BLOCK_HEADER,
1818
allocateBlockId,
1919
consumeCompressionDuration,
20+
getCompressionMessageKey,
2021
allocateRunId,
2122
applyCompressionState,
2223
wrapCompressedSummary,
@@ -64,7 +65,6 @@ export function createCompressRangeTool(ctx: ToolContext): ReturnType<typeof too
6465
typeof (toolCtx as unknown as { callID?: unknown }).callID === "string"
6566
? (toolCtx as unknown as { callID: string }).callID
6667
: undefined
67-
const durationMs = consumeCompressionDuration(ctx.state, callId)
6868

6969
const { rawMessages, searchContext } = await prepareSession(
7070
ctx,
@@ -137,6 +137,11 @@ export function createCompressRangeTool(ctx: ToolContext): ReturnType<typeof too
137137
})
138138
}
139139

140+
const durationMs = consumeCompressionDuration(
141+
ctx.state,
142+
getCompressionMessageKey(toolCtx.sessionID, toolCtx.messageID),
143+
callId,
144+
)
140145
const runId = allocateRunId(ctx.state)
141146

142147
for (const preparedPlan of preparedPlans) {

lib/compress/state.ts

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ import type { AppliedCompressionResult, CompressionStateInput, SelectionResoluti
44

55
export const COMPRESSED_BLOCK_HEADER = "[Compressed conversation section]"
66

7+
export function getCompressionMessageKey(sessionId: string, messageId: string): string {
8+
return `${sessionId}:${messageId}`
9+
}
10+
711
export function allocateBlockId(state: SessionState): number {
812
const next = state.prune.messages.nextBlockId
913
if (!Number.isInteger(next) || next < 1) {
@@ -29,18 +33,53 @@ export function allocateRunId(state: SessionState): number {
2933
export function recordCompressionDuration(
3034
state: SessionState,
3135
callId: string,
36+
messageKey: string,
3237
durationMs: number,
3338
): void {
3439
state.compressionDurations.set(callId, durationMs)
40+
41+
const queue = state.compressionDurationQueue.get(messageKey)
42+
if (!queue) {
43+
state.compressionDurationQueue.set(messageKey, [callId])
44+
return
45+
}
46+
47+
queue.push(callId)
3548
}
3649

37-
export function consumeCompressionDuration(state: SessionState, callId?: string): number {
38-
if (!callId || !state.compressionDurations.has(callId)) {
50+
export function consumeCompressionDuration(
51+
state: SessionState,
52+
messageKey: string,
53+
callId?: string,
54+
): number {
55+
if (callId && state.compressionDurations.has(callId)) {
56+
const queue = state.compressionDurationQueue.get(messageKey)
57+
if (queue) {
58+
const next = queue.filter((id) => id !== callId)
59+
if (next.length === 0) {
60+
state.compressionDurationQueue.delete(messageKey)
61+
} else {
62+
state.compressionDurationQueue.set(messageKey, next)
63+
}
64+
}
65+
66+
const durationMs = state.compressionDurations.get(callId) || 0
67+
state.compressionDurations.delete(callId)
68+
return durationMs
69+
}
70+
71+
const queue = state.compressionDurationQueue.get(messageKey)
72+
const queuedCallId = queue?.shift()
73+
if (queue && queue.length === 0) {
74+
state.compressionDurationQueue.delete(messageKey)
75+
}
76+
77+
if (!queuedCallId) {
3978
return 0
4079
}
4180

42-
const durationMs = state.compressionDurations.get(callId) || 0
43-
state.compressionDurations.delete(callId)
81+
const durationMs = state.compressionDurations.get(queuedCallId) || 0
82+
state.compressionDurations.delete(queuedCallId)
4483
return durationMs
4584
}
4685

lib/hooks.ts

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
} from "./messages"
1717
import { renderSystemPrompt, type PromptStore } from "./prompts"
1818
import { buildProtectedToolsExtension } from "./prompts/extensions/system"
19-
import { recordCompressionDuration } from "./compress/state"
19+
import { getCompressionMessageKey, recordCompressionDuration } from "./compress/state"
2020
import {
2121
applyPendingManualTrigger,
2222
handleContextCommand,
@@ -269,20 +269,12 @@ export function createTextCompleteHandler() {
269269

270270
export function createEventHandler(state: SessionState, logger: Logger) {
271271
return async (input: { event: any }) => {
272-
if (!state.sessionId) {
273-
return
274-
}
275-
276272
if (input.event.type !== "message.part.updated") {
277273
return
278274
}
279275

280276
const part = input.event.properties?.part
281-
if (
282-
part?.sessionID !== state.sessionId ||
283-
part.type !== "tool" ||
284-
part.tool !== "compress"
285-
) {
277+
if (part?.type !== "tool" || part.tool !== "compress") {
286278
return
287279
}
288280

@@ -325,7 +317,12 @@ export function createEventHandler(state: SessionState, logger: Logger) {
325317

326318
state.compressionStarts.delete(part.callID)
327319
const durationMs = Math.max(0, runningAt - start.startedAt)
328-
recordCompressionDuration(state, part.callID, durationMs)
320+
recordCompressionDuration(
321+
state,
322+
part.callID,
323+
getCompressionMessageKey(part.sessionID, start.messageId),
324+
durationMs,
325+
)
329326

330327
logger.info("Recorded compression time", {
331328
callID: part.callID,

lib/state/state.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ export function createSessionState(): SessionState {
8383
},
8484
compressionStarts: new Map<string, CompressionStart>(),
8585
compressionDurations: new Map<string, number>(),
86+
compressionDurationQueue: new Map<string, string[]>(),
8687
toolParameters: new Map<string, ToolParameterEntry>(),
8788
subAgentResultCache: new Map<string, string>(),
8889
toolIdList: [],
@@ -118,8 +119,6 @@ export function resetSessionState(state: SessionState): void {
118119
pruneTokenCounter: 0,
119120
totalPruneTokens: 0,
120121
}
121-
state.compressionStarts.clear()
122-
state.compressionDurations.clear()
123122
state.toolParameters.clear()
124123
state.subAgentResultCache.clear()
125124
state.toolIdList = []

lib/state/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ export interface SessionState {
104104
stats: SessionStats
105105
compressionStarts: Map<string, CompressionStart>
106106
compressionDurations: Map<string, number>
107+
compressionDurationQueue: Map<string, string[]>
107108
toolParameters: Map<string, ToolParameterEntry>
108109
subAgentResultCache: Map<string, string>
109110
toolIdList: string[]

tests/compress-message.test.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,55 @@ test("compress message mode batches individual message summaries", async () => {
226226
assert.match(blocks[1]?.summary || "", /task output body/)
227227
})
228228

229+
test("compress message mode consumes queued duration without call id", async () => {
230+
const sessionID = `ses_message_compress_duration_${Date.now()}`
231+
const rawMessages = buildMessages(sessionID)
232+
const state = createSessionState()
233+
const logger = new Logger(false)
234+
state.compressionDurations.set("call-1", 225)
235+
state.compressionDurationQueue.set(`${sessionID}:msg-compress-message`, ["call-1"])
236+
237+
const tool = createCompressMessageTool({
238+
client: {
239+
session: {
240+
messages: async () => ({ data: rawMessages }),
241+
get: async () => ({ data: { parentID: null } }),
242+
},
243+
},
244+
state,
245+
logger,
246+
config: buildConfig(),
247+
prompts: {
248+
reload() {},
249+
getRuntimePrompts() {
250+
return { compressMessage: "", compressRange: "" }
251+
},
252+
},
253+
} as any)
254+
255+
await tool.execute(
256+
{
257+
topic: "Batch stale notes",
258+
content: [
259+
{
260+
messageId: "m0002",
261+
topic: "Code path note",
262+
summary: "Captured the assistant's code-path findings.",
263+
},
264+
],
265+
},
266+
{
267+
ask: async () => {},
268+
metadata: () => {},
269+
sessionID,
270+
messageID: "msg-compress-message",
271+
},
272+
)
273+
274+
const block = Array.from(state.prune.messages.blocksById.values())[0]
275+
assert.equal(block?.durationMs, 225)
276+
})
277+
229278
test("compress message mode does not partially apply when preparation fails", async () => {
230279
const sessionID = `ses_message_compress_prepare_fail_${Date.now()}`
231280
const rawMessages = buildMessages(sessionID)

tests/hooks-permission.test.ts

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
createEventHandler,
99
createTextCompleteHandler,
1010
} from "../lib/hooks"
11-
import { consumeCompressionDuration } from "../lib/compress/state"
11+
import { consumeCompressionDuration, getCompressionMessageKey } from "../lib/compress/state"
1212
import { Logger } from "../lib/logger"
1313
import { createSessionState, type WithParts } from "../lib/state"
1414

@@ -207,6 +207,10 @@ test("event hook records compress input generation duration", async () => {
207207
}
208208

209209
assert.equal(state.compressionDurations.get("call-1"), 225)
210+
assert.deepEqual(
211+
state.compressionDurationQueue.get(getCompressionMessageKey("session-1", "message-1")),
212+
["call-1"],
213+
)
210214
assert.equal(state.compressionStarts.has("call-1"), false)
211215
})
212216

@@ -301,12 +305,13 @@ test("compression duration consumption handles multiple calls in one message", a
301305
Date.now = originalNow
302306
}
303307

304-
assert.equal(consumeCompressionDuration(state, "call-1"), 225)
305-
assert.equal(consumeCompressionDuration(state, "call-2"), 310)
306-
assert.equal(consumeCompressionDuration(state, "call-1"), 0)
308+
const key = getCompressionMessageKey("session-1", "message-1")
309+
assert.equal(consumeCompressionDuration(state, key), 225)
310+
assert.equal(consumeCompressionDuration(state, key), 310)
311+
assert.equal(consumeCompressionDuration(state, key), 0)
307312
})
308313

309-
test("compression duration consumption returns zero for unknown call id", async () => {
314+
test("compression duration consumption supports explicit call id lookup", async () => {
310315
const state = createSessionState()
311316
state.sessionId = "session-1"
312317
const handler = createEventHandler(state, new Logger(false))
@@ -397,10 +402,10 @@ test("compression duration consumption returns zero for unknown call id", async
397402
Date.now = originalNow
398403
}
399404

400-
assert.equal(consumeCompressionDuration(state, "call-2"), 310)
401-
assert.equal(consumeCompressionDuration(state, "missing-call"), 0)
402-
assert.equal(consumeCompressionDuration(state, undefined), 0)
403-
assert.equal(consumeCompressionDuration(state, "call-1"), 225)
405+
const key = getCompressionMessageKey("session-1", "message-1")
406+
assert.equal(consumeCompressionDuration(state, key, "call-2"), 310)
407+
assert.equal(consumeCompressionDuration(state, key), 225)
408+
assert.equal(consumeCompressionDuration(state, key), 0)
404409
})
405410

406411
test("event hook ignores non-compress tool parts", async () => {

0 commit comments

Comments
 (0)