From a651039b48a2ffa852e0f7a6cc0d308ab0d763b0 Mon Sep 17 00:00:00 2001 From: Simba Zhang Date: Sat, 7 Mar 2026 07:21:08 -0800 Subject: [PATCH] feat: streaming SSE for benchmark LLM calls with direct endpoint support - Rewrite llmCall() to use stream:true with SSE parsing and idle timeout - Support direct llama-server (AEGIS_LLM_URL) and cloud provider (AEGIS_LLM_BASE_URL + API key) endpoints, bypassing gateway - Handle reasoning_content from thinking models (Qwen3.5) - Add progress logging every 100 tokens - Update banner and healthcheck to show actual LLM endpoint --- .../scripts/run-benchmark.cjs | 193 ++++++++++++++---- 1 file changed, 154 insertions(+), 39 deletions(-) diff --git a/skills/analysis/home-security-benchmark/scripts/run-benchmark.cjs b/skills/analysis/home-security-benchmark/scripts/run-benchmark.cjs index 6fd2b46c..56d84747 100644 --- a/skills/analysis/home-security-benchmark/scripts/run-benchmark.cjs +++ b/skills/analysis/home-security-benchmark/scripts/run-benchmark.cjs @@ -80,14 +80,23 @@ try { skillParams = JSON.parse(process.env.AEGIS_SKILL_PARAMS || '{}'); } catch // Aegis provides config via env vars; CLI args are fallback for standalone const GATEWAY_URL = process.env.AEGIS_GATEWAY_URL || getArg('gateway', 'http://localhost:5407'); +const LLM_URL = process.env.AEGIS_LLM_URL || getArg('llm', ''); // Direct llama-server LLM port const VLM_URL = process.env.AEGIS_VLM_URL || getArg('vlm', ''); const RESULTS_DIR = getArg('out', path.join(os.homedir(), '.aegis-ai', 'benchmarks')); const IS_SKILL_MODE = !!process.env.AEGIS_SKILL_ID; const NO_OPEN = args.includes('--no-open') || skillParams.noOpen || false; const TEST_MODE = skillParams.mode || 'full'; -const TIMEOUT_MS = 30000; +const IDLE_TIMEOUT_MS = 30000; // Streaming idle timeout — resets on each received token const FIXTURES_DIR = path.join(__dirname, '..', 'fixtures'); +// API type and model info from Aegis (or defaults for standalone) +const LLM_API_TYPE = process.env.AEGIS_LLM_API_TYPE || 'openai'; +const LLM_MODEL = process.env.AEGIS_LLM_MODEL || ''; +const LLM_API_KEY = process.env.AEGIS_LLM_API_KEY || ''; +const LLM_BASE_URL = process.env.AEGIS_LLM_BASE_URL || ''; +const VLM_API_TYPE = process.env.AEGIS_VLM_API_TYPE || 'openai-compatible'; +const VLM_MODEL = process.env.AEGIS_VLM_MODEL || ''; + // ─── Skill Protocol: JSON lines on stdout, human text on stderr ────────────── /** @@ -127,44 +136,134 @@ const results = { }; async function llmCall(messages, opts = {}) { - const body = { messages, stream: false }; + const body = { messages, stream: true }; + if (opts.model || LLM_MODEL) body.model = opts.model || LLM_MODEL; if (opts.maxTokens) body.max_tokens = opts.maxTokens; if (opts.temperature !== undefined) body.temperature = opts.temperature; if (opts.tools) body.tools = opts.tools; - // Strip trailing /v1 from VLM_URL to avoid double-path (e.g. host:5405/v1/v1/...) - const vlmBase = VLM_URL ? VLM_URL.replace(/\/v1\/?$/, '') : ''; - const url = opts.vlm ? `${vlmBase}/v1/chat/completions` : `${GATEWAY_URL}/v1/chat/completions`; - const response = await fetch(url, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(body), - signal: AbortSignal.timeout(opts.timeout || TIMEOUT_MS), - }); - - if (!response.ok) { - const errBody = await response.text().catch(() => ''); - throw new Error(`HTTP ${response.status}: ${errBody.slice(0, 200)}`); + // Resolve LLM endpoint — priority: + // 1. Cloud provider base URL (e.g. https://api.openai.com/v1) when set via UI + // 2. Direct llama-server URL (port 5411) for builtin local models + // 3. Gateway (port 5407) as final fallback + const strip = (u) => u.replace(/\/v1\/?$/, ''); + let url; + if (opts.vlm) { + const vlmBase = VLM_URL ? strip(VLM_URL) : ''; + url = `${vlmBase}/v1/chat/completions`; + } else if (LLM_BASE_URL) { + url = `${strip(LLM_BASE_URL)}/chat/completions`; + } else if (LLM_URL) { + url = `${strip(LLM_URL)}/v1/chat/completions`; + } else { + url = `${GATEWAY_URL}/v1/chat/completions`; } - const data = await response.json(); - const content = data.choices?.[0]?.message?.content || ''; - const toolCalls = data.choices?.[0]?.message?.tool_calls || null; - const usage = data.usage || {}; + // Build headers — include API key if available (for direct cloud provider access) + const headers = { 'Content-Type': 'application/json' }; + if (LLM_API_KEY && !opts.vlm) headers['Authorization'] = `Bearer ${LLM_API_KEY}`; - // Track token totals - results.tokenTotals.prompt += usage.prompt_tokens || 0; - results.tokenTotals.completion += usage.completion_tokens || 0; - results.tokenTotals.total += usage.total_tokens || 0; + // Use an AbortController with idle timeout that resets on each SSE chunk. + // This way long inferences that stream tokens succeed, but requests + // stuck with no output for IDLE_TIMEOUT_MS still abort. + const controller = new AbortController(); + const idleMs = opts.timeout || IDLE_TIMEOUT_MS; + let idleTimer = setTimeout(() => controller.abort(), idleMs); + const resetIdle = () => { clearTimeout(idleTimer); idleTimer = setTimeout(() => controller.abort(), idleMs); }; - // Capture model name from first response - if (opts.vlm) { - if (!results.model.vlm && data.model) results.model.vlm = data.model; - } else { - if (!results.model.name && data.model) results.model.name = data.model; - } + try { + const response = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(body), + signal: controller.signal, + }); - return { content, toolCalls, usage, model: data.model }; + if (!response.ok) { + const errBody = await response.text().catch(() => ''); + throw new Error(`HTTP ${response.status}: ${errBody.slice(0, 200)}`); + } + + // Parse SSE stream + let content = ''; + let reasoningContent = ''; + let toolCalls = null; + let model = ''; + let usage = {}; + let finishReason = ''; + let tokenCount = 0; + + const reader = response.body; + const decoder = new TextDecoder(); + let buffer = ''; + + for await (const chunk of reader) { + resetIdle(); + buffer += decoder.decode(chunk, { stream: true }); + + // Process complete SSE lines + const lines = buffer.split('\n'); + buffer = lines.pop(); // Keep incomplete line in buffer + + for (const line of lines) { + if (!line.startsWith('data: ')) continue; + const payload = line.slice(6).trim(); + if (payload === '[DONE]') continue; + + try { + const evt = JSON.parse(payload); + if (evt.model) model = evt.model; + + const delta = evt.choices?.[0]?.delta; + if (delta?.content) content += delta.content; + if (delta?.reasoning_content) reasoningContent += delta.reasoning_content; + if (delta?.content || delta?.reasoning_content) { + tokenCount++; + // Log progress every 100 tokens so the console isn't silent + if (tokenCount % 100 === 0) { + log(` … ${tokenCount} tokens received`); + } + } + if (delta?.tool_calls) { + // Accumulate streamed tool calls + if (!toolCalls) toolCalls = []; + for (const tc of delta.tool_calls) { + const idx = tc.index ?? 0; + if (!toolCalls[idx]) { + toolCalls[idx] = { id: tc.id, type: tc.type || 'function', function: { name: '', arguments: '' } }; + } + if (tc.function?.name) toolCalls[idx].function.name += tc.function.name; + if (tc.function?.arguments) toolCalls[idx].function.arguments += tc.function.arguments; + } + } + if (evt.choices?.[0]?.finish_reason) finishReason = evt.choices[0].finish_reason; + if (evt.usage) usage = evt.usage; + } catch { /* skip malformed SSE */ } + } + } + + // If the model only produced reasoning_content (thinking) with no content, + // use the reasoning output as the response content for evaluation purposes. + if (!content && reasoningContent) { + content = reasoningContent; + } + + // Track token totals + results.tokenTotals.prompt += usage.prompt_tokens || 0; + results.tokenTotals.completion += usage.completion_tokens || 0; + results.tokenTotals.total += usage.total_tokens || 0; + + // Capture model name from first response + if (opts.vlm) { + if (!results.model.vlm && model) results.model.vlm = model; + } else { + if (!results.model.name && model) results.model.name = model; + } + + return { content, toolCalls, usage, model }; + } finally { + clearTimeout(idleTimer); + } } function stripThink(text) { @@ -1675,17 +1774,32 @@ async function main() { log('╔══════════════════════════════════════════════════════════════════╗'); log('║ Home Security AI Benchmark Suite • DeepCamera / SharpAI ║'); log('╚══════════════════════════════════════════════════════════════════╝'); - log(` Gateway: ${GATEWAY_URL}`); - log(` VLM: ${VLM_URL || '(disabled — use --vlm URL to enable)'}`); + // Resolve the LLM endpoint that will actually be used + const effectiveLlmUrl = LLM_BASE_URL + ? LLM_BASE_URL.replace(/\/v1\/?$/, '') + : LLM_URL + ? LLM_URL.replace(/\/v1\/?$/, '') + : GATEWAY_URL; + + log(` LLM: ${LLM_API_TYPE} @ ${effectiveLlmUrl}${LLM_MODEL ? ' → ' + LLM_MODEL : ''}`); + log(` VLM: ${VLM_URL || '(disabled — use --vlm URL to enable)'}${VLM_MODEL ? ' → ' + VLM_MODEL : ''}`); log(` Results: ${RESULTS_DIR}`); - log(` Mode: ${IS_SKILL_MODE ? 'Aegis Skill' : 'Standalone'}`); + log(` Mode: ${IS_SKILL_MODE ? 'Aegis Skill' : 'Standalone'} (streaming, ${IDLE_TIMEOUT_MS / 1000}s idle timeout)`); log(` Time: ${new Date().toLocaleString()}`); - // Healthcheck + // Healthcheck — ping the actual LLM endpoint directly + const healthUrl = LLM_BASE_URL + ? `${LLM_BASE_URL.replace(/\/v1\/?$/, '')}/v1/chat/completions` + : LLM_URL + ? `${LLM_URL.replace(/\/v1\/?$/, '')}/v1/chat/completions` + : `${GATEWAY_URL}/v1/chat/completions`; + const healthHeaders = { 'Content-Type': 'application/json' }; + if (LLM_API_KEY) healthHeaders['Authorization'] = `Bearer ${LLM_API_KEY}`; + try { - const ping = await fetch(`${GATEWAY_URL}/v1/chat/completions`, { + const ping = await fetch(healthUrl, { method: 'POST', - headers: { 'Content-Type': 'application/json' }, + headers: healthHeaders, body: JSON.stringify({ messages: [{ role: 'user', content: 'ping' }], stream: false, max_tokens: 1 }), signal: AbortSignal.timeout(15000), }); @@ -1694,9 +1808,10 @@ async function main() { results.model.name = data.model || 'unknown'; log(` Model: ${results.model.name}`); } catch (err) { - log(`\n ❌ Cannot reach LLM gateway: ${err.message}`); - log(' Start the llama-cpp server and gateway, then re-run.\n'); - emit({ event: 'error', message: `Cannot reach LLM gateway: ${err.message}` }); + log(`\n ❌ Cannot reach LLM endpoint: ${err.message}`); + log(` Endpoint: ${healthUrl}`); + log(' Check that the LLM server is running.\n'); + emit({ event: 'error', message: `Cannot reach LLM endpoint: ${err.message}` }); process.exit(1); }