diff --git a/package-lock.json b/package-lock.json index 93898052384..00f3f10f3e2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "1.2.25", "dependencies": { "@agentclientprotocol/sdk": "^0.19.0", + "@anthropic-ai/sdk": "^0.90.0", "@boxlite-ai/boxlite": "^0.4.3", "better-sqlite3": "^11.8.1", "cron-parser": "^5.5.0", @@ -59,6 +60,26 @@ "zod": "^3.25.0 || ^4.0.0" } }, + "node_modules/@anthropic-ai/sdk": { + "version": "0.90.0", + "resolved": "https://registry.npmjs.org/@anthropic-ai/sdk/-/sdk-0.90.0.tgz", + "integrity": "sha512-MzZtPabJF1b0FTDl6Z6H5ljphPwACLGP13lu8MTiB8jXaW/YXlpOp+Po2cVou3MPM5+f5toyLnul9whKCy7fBg==", + "license": "MIT", + "dependencies": { + "json-schema-to-ts": "^3.1.1" + }, + "bin": { + "anthropic-ai-sdk": "bin/cli" + }, + "peerDependencies": { + "zod": "^3.25.0 || ^4.0.0" + }, + "peerDependenciesMeta": { + "zod": { + "optional": true + } + } + }, "node_modules/@babel/helper-string-parser": { "version": "7.27.1", "resolved": "https://registry.npmjs.org/@babel/helper-string-parser/-/helper-string-parser-7.27.1.tgz", @@ -95,6 +116,15 @@ "node": ">=6.0.0" } }, + "node_modules/@babel/runtime": { + "version": "7.29.2", + "resolved": "https://registry.npmjs.org/@babel/runtime/-/runtime-7.29.2.tgz", + "integrity": "sha512-JiDShH45zKHWyGe4ZNVRrCjBz8Nh9TMmZG1kh4QTK8hCBTWBi8Da+i7s1fJw7/lYpM4ccepSNfqzZ/QvABBi5g==", + "license": "MIT", + "engines": { + "node": ">=6.9.0" + } + }, "node_modules/@babel/types": { "version": "7.29.0", "resolved": "https://registry.npmjs.org/@babel/types/-/types-7.29.0.tgz", @@ -2781,6 +2811,19 @@ "integrity": "sha512-4bV5BfR2mqfQTJm+V5tPPdf+ZpuhiIvTuAB5g8kcrXOZpTT/QwwVRWBywX1ozr6lEuPdbHxwaJlm9G6mI2sfSQ==", "dev": true }, + "node_modules/json-schema-to-ts": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/json-schema-to-ts/-/json-schema-to-ts-3.1.1.tgz", + "integrity": "sha512-+DWg8jCJG2TEnpy7kOm/7/AxaYoaRbjVB4LFZLySZlWn8exGs3A4OLJR966cVvU26N7X9TWxl+Jsw7dzAqKT6g==", + "license": "MIT", + "dependencies": { + "@babel/runtime": "^7.18.3", + "ts-algebra": "^2.0.0" + }, + "engines": { + "node": ">=16" + } + }, "node_modules/json-schema-traverse": { "version": "0.4.1", "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz", @@ -3928,6 +3971,12 @@ "tree-kill": "cli.js" } }, + "node_modules/ts-algebra": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/ts-algebra/-/ts-algebra-2.0.0.tgz", + "integrity": "sha512-FPAhNPFMrkwz76P7cdjdmiShwMynZYN6SgOujD1urY4oNm80Ou9oMdmbR45LotcKOXoy7wSmHkRFE6Mxbrhefw==", + "license": "MIT" + }, "node_modules/ts-api-utils": { "version": "2.5.0", "resolved": "https://registry.npmjs.org/ts-api-utils/-/ts-api-utils-2.5.0.tgz", diff --git a/package.json b/package.json index 83bb614f626..3926e28e259 100644 --- a/package.json +++ b/package.json @@ -56,6 +56,7 @@ }, "dependencies": { "@agentclientprotocol/sdk": "^0.19.0", + "@anthropic-ai/sdk": "^0.90.0", "@boxlite-ai/boxlite": "^0.4.3", "better-sqlite3": "^11.8.1", "cron-parser": "^5.5.0", diff --git a/src/agent/actions-http.e2e.test.ts b/src/agent/actions-http.e2e.test.ts index 49463fe1416..1cc46c7f0f6 100644 --- a/src/agent/actions-http.e2e.test.ts +++ b/src/agent/actions-http.e2e.test.ts @@ -106,7 +106,7 @@ class StdioMcpClient { `MCP request ${method} #${id} timed out. stderr so far:\n${this.stderr}`, ), ); - }, 5000); + }, 10_000); this.pending.set(id, (res) => { clearTimeout(timer); resolve(res); diff --git a/src/agent/context-compressor.test.ts b/src/agent/context-compressor.test.ts new file mode 100644 index 00000000000..6dd7c26d5dd --- /dev/null +++ b/src/agent/context-compressor.test.ts @@ -0,0 +1,85 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +import { + ContextCompressor, + type FormattedMessage, +} from './context-compressor.js'; + +const mockAnthropic = { + messages: { + create: vi.fn().mockResolvedValue({ + content: [{ type: 'text', text: 'Mock summary of conversation.' }], + }), + }, +} as any; + +describe('ContextCompressor', () => { + let compressor: ContextCompressor; + + beforeEach(() => { + vi.clearAllMocks(); + compressor = new ContextCompressor(mockAnthropic); + }); + + describe('needsCompression', () => { + it('returns false when utilization is null', () => { + expect(compressor.needsCompression(null)).toBe(false); + }); + it('returns false when utilization is below 0.80', () => { + expect(compressor.needsCompression(0.79)).toBe(false); + }); + it('returns true when utilization is exactly 0.80', () => { + expect(compressor.needsCompression(0.8)).toBe(true); + }); + it('returns true when utilization is above 0.80', () => { + expect(compressor.needsCompression(0.85)).toBe(true); + }); + }); + + describe('compress', () => { + it('returns empty result for empty messages', async () => { + const result = await compressor.compress([]); + expect(result).toEqual({ + summary: '', + messagesCompressed: 0, + messagesKept: 0, + }); + }); + + it('keeps at least 1 message verbatim', async () => { + const messages: FormattedMessage[] = [ + { sender: 'user', content: 'hello' }, + ]; + const result = await compressor.compress(messages); + expect(result.messagesKept).toBeGreaterThanOrEqual(1); + }); + + it('compresses and keeps correct counts for 10 messages', async () => { + const messages: FormattedMessage[] = Array.from( + { length: 10 }, + (_, i) => ({ + sender: i % 2 === 0 ? 'user' : 'assistant', + content: `Message ${i}`, + }), + ); + const result = await compressor.compress(messages); + expect(result.messagesKept).toBe(2); // 20% of 10 + expect(result.messagesCompressed).toBe(8); + expect(result.summary).toBe('Mock summary of conversation.'); + }); + + it('calls haiku model for summarization', async () => { + const messages: FormattedMessage[] = Array.from( + { length: 5 }, + (_, i) => ({ + sender: 'user', + content: `msg ${i}`, + }), + ); + await compressor.compress(messages); + expect(mockAnthropic.messages.create).toHaveBeenCalledWith( + expect.objectContaining({ model: 'claude-haiku-4-5-20251001' }), + ); + }); + }); +}); diff --git a/src/agent/context-compressor.ts b/src/agent/context-compressor.ts new file mode 100644 index 00000000000..cec09c608c0 --- /dev/null +++ b/src/agent/context-compressor.ts @@ -0,0 +1,54 @@ +import Anthropic from '@anthropic-ai/sdk'; +import { TextBlock } from '@anthropic-ai/sdk/resources/messages.js'; + +export interface FormattedMessage { + sender: string; + content: string; +} + +export interface CompressResult { + summary: string; + messagesCompressed: number; + messagesKept: number; +} + +export class ContextCompressor { + constructor(private anthropic: Anthropic) {} + + needsCompression(utilization: number | null): boolean { + return utilization !== null && utilization >= 0.8; + } + + async compress(messages: FormattedMessage[]): Promise { + if (messages.length === 0) { + return { summary: '', messagesCompressed: 0, messagesKept: 0 }; + } + const keepCount = Math.max(1, Math.floor(messages.length * 0.2)); + const toSummarize = messages.slice(0, messages.length - keepCount); + const kept = messages.slice(messages.length - keepCount); + const summary = + toSummarize.length > 0 ? await this.callHaiku(toSummarize) : ''; + return { + summary, + messagesCompressed: toSummarize.length, + messagesKept: kept.length, + }; + } + + private async callHaiku(messages: FormattedMessage[]): Promise { + const transcript = messages + .map((m) => `[${m.sender}]: ${m.content}`) + .join('\n'); + const response = await this.anthropic.messages.create({ + model: 'claude-haiku-4-5-20251001', + max_tokens: 1024, + messages: [ + { + role: 'user', + content: `Summarize this conversation compactly, preserving key facts, decisions, and context:\n\n${transcript}`, + }, + ], + }); + return (response.content[0] as TextBlock).text; + } +} diff --git a/src/agent/message-processor.test.ts b/src/agent/message-processor.test.ts new file mode 100644 index 00000000000..329e7595854 --- /dev/null +++ b/src/agent/message-processor.test.ts @@ -0,0 +1,327 @@ +import fs from 'fs'; +import os from 'os'; +import path from 'path'; + +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +const contextCompressionMocks = vi.hoisted(() => ({ + needsCompression: vi.fn( + (utilization: number | null) => utilization !== null && utilization >= 0.8, + ), + compress: vi.fn( + async (messages: Array<{ role: string; content: string }>) => { + const kept = Math.max(1, Math.floor(messages.length * 0.2)); + return { + summary: 'condensed history', + messagesCompressed: messages.length - kept, + messagesKept: kept, + }; + }, + ), +})); + +vi.mock('../container-runner.js', async () => { + const actual = await vi.importActual( + '../container-runner.js', + ); + return { + ...actual, + runContainerAgent: vi.fn(), + }; +}); + +vi.mock('./context-compressor.js', () => ({ + ContextCompressor: class { + needsCompression(utilization: number | null) { + return contextCompressionMocks.needsCompression(utilization); + } + + compress(messages: Array<{ role: string; content: string }>) { + return contextCompressionMocks.compress(messages); + } + + formatSummaryBlock(summary: string, compressedAt: string) { + return `\nEarlier conversation summary (auto-generated):\n${summary}\n`; + } + }, +})); + +import { AgentImpl } from './agent-impl.js'; +import { + buildAgentConfig, + resolveSerializableAgentSettings, +} from './config.js'; +import type { ContextCompressedEvent } from '../api/events.js'; +import { runContainerAgent } from '../container-runner.js'; +import { _initTestDatabase, AgentDb } from '../db.js'; +import { buildRuntimeConfig } from '../runtime-config.js'; +import type { Channel, RegisteredGroup } from '../types.js'; + +const runtimeConfig = buildRuntimeConfig( + { timezone: 'UTC' }, + '/tmp/agentlite-test-pkg', +); + +const MAIN_GROUP: RegisteredGroup = { + name: 'Main', + folder: 'main', + trigger: 'always', + added_at: '2024-01-01T00:00:00.000Z', + isMain: true, +}; + +let tmpDir: string; +let db: AgentDb; + +function createAgent(name: string): AgentImpl { + const config = buildAgentConfig({ + agentId: `${name}00000000`.slice(0, 8), + ...resolveSerializableAgentSettings( + name, + { workdir: path.join(tmpDir, 'agents', name) }, + tmpDir, + ), + }); + return new AgentImpl(config, runtimeConfig); +} + +function createMockChannel(): Channel { + return { + name: 'mock', + async connect(): Promise {}, + async disconnect(): Promise {}, + async sendMessage(): Promise {}, + isConnected(): boolean { + return true; + }, + ownsJid(jid: string): boolean { + return jid === 'mock:stream'; + }, + async setTyping(): Promise {}, + }; +} + +function setupAgent(): AgentImpl { + const agent = createAgent('message-processor-test'); + agent._setDbForTests(db); + agent._setRegisteredGroups({ 'mock:stream': MAIN_GROUP }); + (agent as unknown as { _started: boolean })._started = true; + const channel = createMockChannel(); + (agent as unknown as { channels: Map }).channels.set( + 'mock', + channel, + ); + + db.storeChatMetadata( + 'mock:stream', + '2026-04-13T00:00:00.000Z', + 'Message Processor Test Chat', + ); + db.storeMessage({ + id: 'msg-1', + chat_jid: 'mock:stream', + sender: 'user1', + sender_name: 'User 1', + content: 'do something', + timestamp: '2026-04-13T00:00:01.000Z', + is_from_me: false, + }); + + return agent; +} + +beforeEach(() => { + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'agentlite-message-')); + db = _initTestDatabase(); + vi.mocked(runContainerAgent).mockReset(); + contextCompressionMocks.needsCompression.mockReset(); + contextCompressionMocks.needsCompression.mockImplementation( + (utilization: number | null) => utilization !== null && utilization >= 0.8, + ); + contextCompressionMocks.compress.mockReset(); + contextCompressionMocks.compress.mockImplementation( + async (messages: Array<{ role: string; content: string }>) => { + const kept = Math.max(1, Math.floor(messages.length * 0.2)); + return { + summary: 'condensed history', + messagesCompressed: messages.length - kept, + messagesKept: kept, + }; + }, + ); +}); + +afterEach(() => { + try { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } catch { + /* ignore */ + } +}); + +function sdkMsg(sdkType: string, message: unknown, sdkSubtype?: string) { + return { type: 'sdk_message' as const, sdkType, sdkSubtype, message }; +} + +describe('MessageProcessor', () => { + it('stores 0.75 from a rate_limit_event', async () => { + const agent = setupAgent(); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _rc, _onProcess, onOutput) => { + await onOutput?.( + sdkMsg('rate_limit_event', { + utilization: 0.75, + }), + ); + await onOutput?.({ + type: 'state', + state: 'stopped', + reason: 'exit', + exitCode: 0, + }); + return { status: 'success', result: null }; + }, + ); + + await agent.processGroupMessages('mock:stream'); + + expect(db.getContextUtilization('mock:stream')).toBe(0.75); + }); + + it('stores 0.82 from a rate_limit_event', async () => { + const agent = setupAgent(); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _rc, _onProcess, onOutput) => { + await onOutput?.( + sdkMsg('rate_limit_event', { + rate_limit_info: { + utilization: 0.82, + }, + }), + ); + await onOutput?.({ + type: 'state', + state: 'stopped', + reason: 'exit', + exitCode: 0, + }); + return { status: 'success', result: null }; + }, + ); + + await agent.processGroupMessages('mock:stream'); + + expect(db.getContextUtilization('mock:stream')).toBe(0.82); + }); + + it('prepends a summary block, clears the session, and emits context_compressed when utilization is high', async () => { + const agent = setupAgent(); + db.setSession('main', 'existing-session'); + (agent as unknown as { sessions: Record }).sessions.main = + 'existing-session'; + db.setContextUtilization('mock:stream', 0.82); + + for (let i = 2; i <= 10; i++) { + db.storeMessage({ + id: `msg-${i}`, + chat_jid: 'mock:stream', + sender: `user${i}`, + sender_name: `User ${i}`, + content: `message ${i}`, + timestamp: `2026-04-13T00:00:${String(i).padStart(2, '0')}.000Z`, + is_from_me: false, + }); + } + + contextCompressionMocks.compress.mockResolvedValue({ + summary: 'compressed context', + messagesCompressed: 8, + messagesKept: 2, + }); + + let capturedInput: + | { + prompt: string; + sessionId?: string; + } + | undefined; + const events: ContextCompressedEvent[] = []; + agent.on('context_compressed', (evt) => events.push(evt)); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, input, _rc, _onProcess, onOutput) => { + capturedInput = input; + await onOutput?.({ + type: 'state', + state: 'stopped', + reason: 'exit', + exitCode: 0, + }); + return { status: 'success', result: null }; + }, + ); + + await agent.processGroupMessages('mock:stream'); + + expect(capturedInput).toBeDefined(); + expect(capturedInput!.sessionId).toBeUndefined(); + expect(capturedInput!.prompt.startsWith(' { + const agent = setupAgent(); + db.setSession('main', 'existing-session'); + (agent as unknown as { sessions: Record }).sessions.main = + 'existing-session'; + db.setContextUtilization('mock:stream', 0.79); + + let capturedInput: + | { + prompt: string; + sessionId?: string; + } + | undefined; + const events: ContextCompressedEvent[] = []; + agent.on('context_compressed', (evt) => events.push(evt)); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, input, _rc, _onProcess, onOutput) => { + capturedInput = input; + await onOutput?.({ + type: 'state', + state: 'stopped', + reason: 'exit', + exitCode: 0, + }); + return { status: 'success', result: null }; + }, + ); + + await agent.processGroupMessages('mock:stream'); + + expect(contextCompressionMocks.compress).not.toHaveBeenCalled(); + expect(capturedInput).toBeDefined(); + expect(capturedInput!.sessionId).toBe('existing-session'); + expect(capturedInput!.prompt.startsWith(' | null = null; private _wakeLoop: (() => void) | null = null; + private readonly contextCompressor = new ContextCompressor( + new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY ?? 'missing' }), + ); constructor( private readonly ctx: AgentContext, @@ -123,10 +131,11 @@ export class MessageProcessor { if (!hasTrigger) return true; } - const prompt = formatMessages( + let prompt = formatMessages( missedMessages, this.ctx.runtimeConfig.timezone, ); + prompt = await this.maybeCompressPrompt(chatJid, group, prompt); const previousCursor = this.ctx.lastAgentTimestamp[chatJid] || ''; this.ctx.lastAgentTimestamp[chatJid] = @@ -211,6 +220,7 @@ export class MessageProcessor { if (event.type === 'sdk_message') { const now = new Date().toISOString(); const msg = event.message; + const utilization = this.getRateLimitUtilization(msg); // Always emit raw event — consumers get all 21 SDK types this.ctx.emit('run.sdk_message', { @@ -222,6 +232,10 @@ export class MessageProcessor { timestamp: now, }); + if (event.sdkType === 'rate_limit_event' && utilization !== null) { + this.ctx.db.setContextUtilization(chatJid, utilization); + } + // Derive curated convenience events from SDK messages if (event.sdkType === 'assistant' && msg?.message?.content) { for (const block of msg.message.content) { @@ -510,4 +524,90 @@ export class MessageProcessor { this.messageLoopRunning = false; } + + private async maybeCompressPrompt( + chatJid: string, + group: InternalRegisteredGroup, + prompt: string, + ): Promise { + const utilization = this.ctx.db.getContextUtilization(chatJid); + if (!this.contextCompressor.needsCompression(utilization)) { + return prompt; + } + + const recentMessages = this.ctx.db.getMessagesSince( + chatJid, + '', + this.ctx.config.assistantName, + ); + if (recentMessages.length === 0) { + return prompt; + } + + try { + const result = await this.contextCompressor.compress( + recentMessages.map((message) => ({ + role: message.sender_name || message.sender, + content: message.content, + })), + ); + if (result.messagesCompressed === 0) { + return prompt; + } + const keptMessages = + result.messagesKept > 0 + ? recentMessages.slice(-result.messagesKept) + : recentMessages; + const compressedAt = new Date().toISOString(); + + delete this.ctx.sessions[group.folder]; + this.ctx.db.clearSession(group.folder); + this.ctx.db.clearContextUtilization(chatJid); + this.ctx.emit('context_compressed', { + agentId: this.ctx.id, + jid: chatJid, + utilization, + messagesCompressed: result.messagesCompressed, + messagesKept: result.messagesKept, + timestamp: compressedAt, + }); + + return `${this.formatSummaryBlock(result.summary, compressedAt)}\n\n${formatMessages( + keptMessages, + this.ctx.runtimeConfig.timezone, + )}`; + } catch (err) { + logger.warn( + { chatJid, group: group.name, err }, + 'Context compression skipped', + ); + return prompt; + } + } + + private getRateLimitUtilization(message: unknown): number | null { + if (!message || typeof message !== 'object') { + return null; + } + + const record = message as Record; + if (typeof record.utilization === 'number') { + return record.utilization; + } + + const rateLimitInfo = record.rate_limit_info; + if (!rateLimitInfo || typeof rateLimitInfo !== 'object') { + return null; + } + + const utilization = (rateLimitInfo as Record).utilization; + return typeof utilization === 'number' ? utilization : null; + } + + private formatSummaryBlock(summary: string, compressedAt: string): string { + return ` +Earlier conversation summary (auto-generated): +${summary} +`; + } } diff --git a/src/api/events.ts b/src/api/events.ts index 701b7e8dd87..155dec8410c 100644 --- a/src/api/events.ts +++ b/src/api/events.ts @@ -14,6 +14,7 @@ export interface AgentEvents extends Record { 'run.tool_progress': [payload: RunToolProgressEvent]; 'run.subagent': [payload: RunSubagentEvent]; 'run.status': [payload: RunStatusEvent]; + context_compressed: [payload: ContextCompressedEvent]; 'chat.metadata': [payload: ChatMetadataEvent]; 'channel.connected': [payload: { key: string }]; 'channel.disconnected': [payload: { key: string }]; @@ -180,6 +181,22 @@ export interface RunStatusEvent { timestamp: string; } +/** Conversation history was compacted into a fresh session. */ +export interface ContextCompressedEvent { + /** Stable agent identifier. */ + agentId: string; + /** Group/chat identifier. */ + jid: string; + /** Utilization value that triggered compression. */ + utilization: number; + /** Number of older messages folded into the summary. */ + messagesCompressed: number; + /** Number of recent messages kept verbatim. */ + messagesKept: number; + /** ISO timestamp. */ + timestamp: string; +} + /** Chat/group metadata discovered from a channel. */ export interface ChatMetadataEvent { /** Group/chat identifier. */ diff --git a/src/db.test.ts b/src/db.test.ts index b33a1d4adec..0d930d8236c 100644 --- a/src/db.test.ts +++ b/src/db.test.ts @@ -527,3 +527,24 @@ describe('registered group isMain', () => { expect(group.isMain).toBeUndefined(); }); }); + +describe('context utilization', () => { + it('returns null when utilization has not been stored', () => { + expect(db.getContextUtilization('group@g.us')).toBeNull(); + }); + + it('stores and retrieves utilization per group', () => { + db.setContextUtilization('group@g.us', 0.5); + db.setContextUtilization('other@g.us', 0.85); + + expect(db.getContextUtilization('group@g.us')).toBe(0.5); + expect(db.getContextUtilization('other@g.us')).toBe(0.85); + }); + + it('clears stored utilization when reset to null', () => { + db.setContextUtilization('group@g.us', 0.5); + db.setContextUtilization('group@g.us', null); + + expect(db.getContextUtilization('group@g.us')).toBeNull(); + }); +}); diff --git a/src/db.ts b/src/db.ts index 5965c9cd244..6462deebe72 100644 --- a/src/db.ts +++ b/src/db.ts @@ -67,7 +67,9 @@ export function createSchema( CREATE TABLE IF NOT EXISTS router_state ( key TEXT PRIMARY KEY, - value TEXT NOT NULL + value TEXT NOT NULL, + context_utilization REAL, + context_utilization_at INTEGER ); CREATE TABLE IF NOT EXISTS sessions ( group_folder TEXT PRIMARY KEY, @@ -142,6 +144,22 @@ export function createSchema( } catch { /* columns already exist */ } + + // Add context utilization columns if they don't exist (migration for existing DBs) + try { + database.exec( + `ALTER TABLE router_state ADD COLUMN context_utilization REAL`, + ); + } catch { + /* column already exists */ + } + try { + database.exec( + `ALTER TABLE router_state ADD COLUMN context_utilization_at INTEGER`, + ); + } catch { + /* column already exists */ + } } export function initDatabase(opts: { @@ -183,6 +201,10 @@ export class AgentDb { private dataDir: string, ) {} + private contextUtilizationKey(groupJid: string): string { + return `context_utilization:${groupJid}`; + } + close(): void { this.db.close(); } @@ -538,13 +560,45 @@ export class AgentDb { .run(key, value); } + setContextUtilization(groupJid: string, utilization: number | null): void { + this.db + .prepare( + ` + INSERT INTO router_state (key, value, context_utilization, context_utilization_at) + VALUES (?, '', ?, ?) + ON CONFLICT(key) DO UPDATE SET + value = excluded.value, + context_utilization = excluded.context_utilization, + context_utilization_at = excluded.context_utilization_at + `, + ) + .run( + this.contextUtilizationKey(groupJid), + utilization, + utilization !== null ? Date.now() : null, + ); + } + + getContextUtilization(groupJid: string): number | null { + const row = this.db + .prepare('SELECT context_utilization FROM router_state WHERE key = ?') + .get(this.contextUtilizationKey(groupJid)) as + | { context_utilization: number | null } + | undefined; + return row?.context_utilization ?? null; + } + + clearContextUtilization(groupJid: string): void { + this.setContextUtilization(groupJid, null); + } + // --- Sessions --- getSession(groupFolder: string): string | undefined { const row = this.db .prepare('SELECT session_id FROM sessions WHERE group_folder = ?') .get(groupFolder) as { session_id: string } | undefined; - return row?.session_id; + return row?.session_id || undefined; } setSession(groupFolder: string, sessionId: string): void { @@ -555,12 +609,19 @@ export class AgentDb { .run(groupFolder, sessionId); } + clearSession(groupFolder: string): void { + this.db + .prepare('DELETE FROM sessions WHERE group_folder = ?') + .run(groupFolder); + } + getAllSessions(): Record { const rows = this.db .prepare('SELECT group_folder, session_id FROM sessions') .all() as Array<{ group_folder: string; session_id: string }>; const result: Record = {}; for (const row of rows) { + if (!row.session_id) continue; result[row.group_folder] = row.session_id; } return result; diff --git a/src/exports.test.ts b/src/exports.test.ts index 656770743a6..1f764a159ff 100644 --- a/src/exports.test.ts +++ b/src/exports.test.ts @@ -91,6 +91,6 @@ describe('package exports', () => { { cwd: repoRoot, encoding: 'utf-8' }, ); expect(result.trim()).toBe('function'); - }); + }, 15_000); }); }); diff --git a/src/run-stream-events.test.ts b/src/run-stream-events.test.ts index 967246ffbf3..ecfb8dd04f7 100644 --- a/src/run-stream-events.test.ts +++ b/src/run-stream-events.test.ts @@ -9,6 +9,22 @@ import path from 'path'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +const contextCompressionMocks = vi.hoisted(() => ({ + needsCompression: vi.fn( + (utilization: number | null) => utilization !== null && utilization >= 0.8, + ), + compress: vi.fn( + async (messages: Array<{ sender: string; content: string }>) => { + const kept = Math.max(1, Math.floor(messages.length * 0.2)); + return { + summary: 'condensed history', + messagesCompressed: messages.length - kept, + messagesKept: kept, + }; + }, + ), +})); + vi.mock('./container-runner.js', async () => { const actual = await vi.importActual( './container-runner.js', @@ -19,6 +35,22 @@ vi.mock('./container-runner.js', async () => { }; }); +vi.mock('./agent/context-compressor.js', () => ({ + ContextCompressor: class { + needsCompression(utilization: number | null) { + return contextCompressionMocks.needsCompression(utilization); + } + + compress(messages: Array<{ sender: string; content: string }>) { + return contextCompressionMocks.compress(messages); + } + + formatSummaryBlock(summary: string, compressedAt: string) { + return `\nEarlier conversation summary (auto-generated):\n${summary}\n`; + } + }, +})); + import { AgentImpl } from './agent/agent-impl.js'; import { buildAgentConfig, @@ -28,6 +60,7 @@ import { _initTestDatabase, AgentDb } from './db.js'; import { buildRuntimeConfig } from './runtime-config.js'; import { runContainerAgent } from './container-runner.js'; import type { + ContextCompressedEvent, RunSdkMessageEvent, RunToolEvent, RunToolProgressEvent, @@ -109,6 +142,24 @@ function setupAgent(): AgentImpl { return agent; } +beforeEach(() => { + contextCompressionMocks.needsCompression.mockReset(); + contextCompressionMocks.needsCompression.mockImplementation( + (utilization: number | null) => utilization !== null && utilization >= 0.8, + ); + contextCompressionMocks.compress.mockReset(); + contextCompressionMocks.compress.mockImplementation( + async (messages: Array<{ sender: string; content: string }>) => { + const kept = Math.max(1, Math.floor(messages.length * 0.2)); + return { + summary: 'condensed history', + messagesCompressed: messages.length - kept, + messagesKept: kept, + }; + }, + ); +}); + // ── Helpers to build sdk_message ContainerEvents ──────────────── function sdkMsg(sdkType: string, message: unknown, sdkSubtype?: string) { @@ -257,6 +308,31 @@ describe('run.sdk_message (raw passthrough)', () => { expect(events[0].message).toEqual(rawMsg); }); + + it('stores context utilization from rate_limit_event', async () => { + const agent = setupAgent(); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _rc, _onProcess, onOutput) => { + await onOutput?.( + sdkMsg('rate_limit_event', { + utilization: 0.5, + }), + ); + await onOutput?.({ + type: 'state', + state: 'stopped', + reason: 'exit', + exitCode: 0, + }); + return { status: 'success', result: null }; + }, + ); + + await agent.processGroupMessages('mock:stream'); + + expect(db.getContextUtilization('mock:stream')).toBe(0.5); + }); }); describe('run.tool (derived from sdk_message)', () => { @@ -603,6 +679,130 @@ describe('run.status (derived from sdk_message)', () => { }); }); +describe('context compression', () => { + beforeEach(() => { + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'agentlite-stream-')); + db = _initTestDatabase(); + vi.mocked(runContainerAgent).mockReset(); + }); + + afterEach(() => { + try { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } catch { + /* ignore */ + } + }); + + it('prepends a summary block, clears the session, and emits context_compressed when utilization is high', async () => { + const agent = setupAgent(); + db.setSession('main', 'existing-session'); + (agent as unknown as { sessions: Record }).sessions.main = + 'existing-session'; + db.setContextUtilization('mock:stream', 0.85); + + for (let i = 2; i <= 10; i++) { + db.storeMessage({ + id: `msg-${i}`, + chat_jid: 'mock:stream', + sender: `user${i}`, + sender_name: `User ${i}`, + content: `message ${i}`, + timestamp: `2026-04-13T00:00:${String(i).padStart(2, '0')}.000Z`, + is_from_me: false, + }); + } + + contextCompressionMocks.compress.mockResolvedValue({ + summary: 'compressed context', + messagesCompressed: 8, + messagesKept: 2, + }); + + let capturedInput: + | { + prompt: string; + sessionId?: string; + } + | undefined; + const events: ContextCompressedEvent[] = []; + agent.on('context_compressed', (evt) => events.push(evt)); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, input, _rc, _onProcess, onOutput) => { + capturedInput = input; + await onOutput?.({ + type: 'state', + state: 'stopped', + reason: 'exit', + exitCode: 0, + }); + return { status: 'success', result: null }; + }, + ); + + await agent.processGroupMessages('mock:stream'); + + expect(capturedInput).toBeDefined(); + expect(capturedInput!.sessionId).toBeUndefined(); + expect(capturedInput!.prompt.startsWith(' { + const agent = setupAgent(); + db.setSession('main', 'existing-session'); + (agent as unknown as { sessions: Record }).sessions.main = + 'existing-session'; + db.setContextUtilization('mock:stream', 0.79); + + let capturedInput: + | { + prompt: string; + sessionId?: string; + } + | undefined; + const events: ContextCompressedEvent[] = []; + agent.on('context_compressed', (evt) => events.push(evt)); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, input, _rc, _onProcess, onOutput) => { + capturedInput = input; + await onOutput?.({ + type: 'state', + state: 'stopped', + reason: 'exit', + exitCode: 0, + }); + return { status: 'success', result: null }; + }, + ); + + await agent.processGroupMessages('mock:stream'); + + expect(contextCompressionMocks.compress).not.toHaveBeenCalled(); + expect(capturedInput).toBeDefined(); + expect(capturedInput!.sessionId).toBe('existing-session'); + expect(capturedInput!.prompt.startsWith(' { beforeEach(() => { tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'agentlite-stream-')); diff --git a/vitest.config.ts b/vitest.config.ts index a456d1cc3df..fc69a5c1aa8 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -3,5 +3,7 @@ import { defineConfig } from 'vitest/config'; export default defineConfig({ test: { include: ['src/**/*.test.ts', 'setup/**/*.test.ts'], + testTimeout: 30000, + hookTimeout: 30000, }, });