From 52d8397ae42a98b37e3bba1992626a6438e62761 Mon Sep 17 00:00:00 2001 From: Timo Klerx <1579185+TKlerx@users.noreply.github.com> Date: Tue, 5 May 2026 14:13:50 +0200 Subject: [PATCH] fix(process): mark rate-limited agent batches as partial and retryable --- packages/core/src/__tests__/schemas.test.ts | 19 +++++++ packages/core/src/run.ts | 2 +- packages/core/src/schemas.ts | 3 +- packages/core/src/types.ts | 3 +- packages/deepsec/src/commands/process.ts | 7 ++- .../src/__tests__/process-revalidate.test.ts | 45 +++++++++++++++ .../processor/src/agents/claude-agent-sdk.ts | 5 ++ packages/processor/src/agents/codex-sdk.ts | 7 +++ packages/processor/src/agents/types.ts | 5 ++ packages/processor/src/index.ts | 56 +++++++++++++++++-- 10 files changed, 142 insertions(+), 10 deletions(-) diff --git a/packages/core/src/__tests__/schemas.test.ts b/packages/core/src/__tests__/schemas.test.ts index 1a68867..8ec4c3d 100644 --- a/packages/core/src/__tests__/schemas.test.ts +++ b/packages/core/src/__tests__/schemas.test.ts @@ -108,4 +108,23 @@ describe("runMetaSchema", () => { }; expect(() => runMetaSchema.parse(valid)).not.toThrow(); }); + + it("accepts partial process run meta with failed file count", () => { + const valid = { + runId: "20260401-a1b2", + projectId: "test", + rootPath: "/path", + createdAt: "2026-04-01T14:30:52.000Z", + completedAt: "2026-04-01T15:00:00.000Z", + type: "process", + phase: "partial", + processorConfig: { + agentType: "claude-agent-sdk", + model: "claude-opus-4-6", + modelConfig: { maxTurns: 50 }, + }, + stats: { filesProcessed: 3, filesFailed: 2, findingsCount: 1 }, + }; + expect(() => runMetaSchema.parse(valid)).not.toThrow(); + }); }); diff --git a/packages/core/src/run.ts b/packages/core/src/run.ts index 55ae84b..51c06e7 100644 --- a/packages/core/src/run.ts +++ b/packages/core/src/run.ts @@ -122,7 +122,7 @@ export function readRunMeta(projectId: string, runId: string): RunMeta { export function completeRun( projectId: string, runId: string, - phase: "done" | "error", + phase: "done" | "partial" | "error", stats?: Partial, ): void { const meta = readRunMeta(projectId, runId); diff --git a/packages/core/src/schemas.ts b/packages/core/src/schemas.ts index 8259b8c..e465e82 100644 --- a/packages/core/src/schemas.ts +++ b/packages/core/src/schemas.ts @@ -161,7 +161,7 @@ export const runMetaSchema = z.object({ createdAt: z.string(), completedAt: z.string().optional(), type: z.enum(["scan", "process", "revalidate"]), - phase: z.enum(["running", "done", "error"]), + phase: z.enum(["running", "done", "partial", "error"]), scannerConfig: z.object({ matcherSlugs: z.array(z.string()) }).optional(), processorConfig: z .object({ @@ -174,6 +174,7 @@ export const runMetaSchema = z.object({ filesScanned: z.number().optional(), candidatesFound: z.number().optional(), filesProcessed: z.number().optional(), + filesFailed: z.number().optional(), findingsCount: z.number().optional(), totalCostUsd: z.number().optional(), totalInputTokens: z.number().optional(), diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index b328e11..c7cc683 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -7,7 +7,7 @@ export interface RunMeta { createdAt: string; completedAt?: string; type: "scan" | "process" | "revalidate"; - phase: "running" | "done" | "error"; + phase: "running" | "done" | "partial" | "error"; scannerConfig?: { matcherSlugs: string[]; }; @@ -20,6 +20,7 @@ export interface RunMeta { filesScanned?: number; candidatesFound?: number; filesProcessed?: number; + filesFailed?: number; findingsCount?: number; totalCostUsd?: number; totalInputTokens?: number; diff --git a/packages/deepsec/src/commands/process.ts b/packages/deepsec/src/commands/process.ts index 27cf556..dfe44d9 100644 --- a/packages/deepsec/src/commands/process.ts +++ b/packages/deepsec/src/commands/process.ts @@ -148,8 +148,13 @@ export async function processCommand(opts: { onProgress: logProgress, }); - console.log(`${GREEN}Processing complete.${RESET} Run: ${BOLD}${result.runId}${RESET}`); + const phaseColor = result.phase === "partial" ? YELLOW : GREEN; + const phaseLabel = result.phase === "partial" ? "Processing partial." : "Processing complete."; + console.log(`${phaseColor}${phaseLabel}${RESET} Run: ${BOLD}${result.runId}${RESET}`); console.log(` Analyses: ${result.analysisCount}`); + if (result.failedCount > 0) { + console.log(` Failed/Retryable: ${result.failedCount}`); + } console.log(` Findings: ${result.findingCount}`); console.log(); console.log(`Next:`); diff --git a/packages/processor/src/__tests__/process-revalidate.test.ts b/packages/processor/src/__tests__/process-revalidate.test.ts index b011956..dec4a19 100644 --- a/packages/processor/src/__tests__/process-revalidate.test.ts +++ b/packages/processor/src/__tests__/process-revalidate.test.ts @@ -108,6 +108,8 @@ describe("processor with stub agent", () => { expect(result.findingCount).toBe(1); expect(result.analysisCount).toBe(1); + expect(result.failedCount).toBe(0); + expect(result.phase).toBe("done"); expect(stub.calls.investigateCalls).toHaveLength(1); expect(stub.calls.investigateCalls[0].batch).toHaveLength(1); @@ -252,6 +254,49 @@ describe("processor with stub agent", () => { expect(rec.analysisHistory[0].refusal?.reason).toBe("stub refusal"); }); + it("process() marks batch retryable and run partial on agent error with zero output", async () => { + const fx = setupProject({ files: ["a.ts", "b.ts"] }); + fx.writeRecord(pendingRecord(fx.projectId, "a.ts")); + fx.writeRecord(pendingRecord(fx.projectId, "b.ts")); + + const stub = new StubAgent({ + async *investigateImpl(params) { + return { + results: params.batch.map((r) => ({ filePath: r.filePath, findings: [] })), + meta: { + durationMs: 1, + hadErrors: true, + usage: { + inputTokens: 0, + outputTokens: 0, + cacheReadInputTokens: 0, + cacheCreationInputTokens: 0, + }, + }, + }; + }, + }); + setLoadedConfig( + defineConfig({ + projects: [{ id: fx.projectId, root: fx.targetRoot }], + plugins: [{ name: "stub-plugin", agents: [stub] }], + }), + ); + + const result = await processProject({ + projectId: fx.projectId, + agentType: "stub", + concurrency: 1, + }); + + expect(result.analysisCount).toBe(0); + expect(result.findingCount).toBe(0); + expect(result.failedCount).toBe(2); + expect(result.phase).toBe("partial"); + expect(fx.readRecord("a.ts").status).toBe("error"); + expect(fx.readRecord("b.ts").status).toBe("error"); + }); + it("revalidate() attaches verdicts to existing findings", async () => { const fx = setupProject({ files: ["app.ts"] }); const rec = pendingRecord(fx.projectId, "app.ts"); diff --git a/packages/processor/src/agents/claude-agent-sdk.ts b/packages/processor/src/agents/claude-agent-sdk.ts index e56eb5a..27cd853 100644 --- a/packages/processor/src/agents/claude-agent-sdk.ts +++ b/packages/processor/src/agents/claude-agent-sdk.ts @@ -76,6 +76,7 @@ export class ClaudeAgentSdkPlugin implements AgentPlugin { let toolUseCount = 0; let sdkMeta: Partial = {}; let lastError = ""; + let hadErrors = false; for (let attempt = 1; attempt <= MAX_ATTEMPTS; attempt++) { if (attempt > 1) { @@ -89,6 +90,7 @@ export class ClaudeAgentSdkPlugin implements AgentPlugin { toolUseCount = 0; sdkMeta = {}; lastError = ""; + hadErrors = false; } try { @@ -173,6 +175,7 @@ export class ClaudeAgentSdkPlugin implements AgentPlugin { } } else { lastError = String(msg.error ?? "unknown"); + hadErrors = true; yield { type: "error" as const, message: `Agent error: ${lastError.slice(0, 300)}`, @@ -189,6 +192,7 @@ export class ClaudeAgentSdkPlugin implements AgentPlugin { } } catch (sdkErr) { lastError = sdkErr instanceof Error ? sdkErr.message : String(sdkErr); + hadErrors = true; yield { type: "error" as const, message: `Agent SDK error: ${lastError.slice(0, 300)}`, @@ -225,6 +229,7 @@ export class ClaudeAgentSdkPlugin implements AgentPlugin { durationMs, ...sdkMeta, refusal, + hadErrors, }, }; } diff --git a/packages/processor/src/agents/codex-sdk.ts b/packages/processor/src/agents/codex-sdk.ts index f8bc732..1f07ee8 100644 --- a/packages/processor/src/agents/codex-sdk.ts +++ b/packages/processor/src/agents/codex-sdk.ts @@ -554,6 +554,7 @@ export class CodexAgentSdkPlugin implements AgentPlugin { let toolUseCount = 0; let sdkMeta: Partial = {}; let lastError = ""; + let hadErrors = false; for (let attempt = 1; attempt <= MAX_ATTEMPTS; attempt++) { if (attempt > 1) { @@ -567,6 +568,7 @@ export class CodexAgentSdkPlugin implements AgentPlugin { toolUseCount = 0; sdkMeta = {}; lastError = ""; + hadErrors = false; } try { @@ -626,6 +628,7 @@ export class CodexAgentSdkPlugin implements AgentPlugin { case "turn.failed": lastError = event.error?.message ?? "turn.failed"; + hadErrors = true; yield { type: "error" as const, message: `Codex turn failed: ${lastError.slice(0, 300)}`, @@ -634,6 +637,7 @@ export class CodexAgentSdkPlugin implements AgentPlugin { case "error": lastError = event.message; + hadErrors = true; yield { type: "error" as const, message: `Codex stream error: ${lastError.slice(0, 300)}`, @@ -643,6 +647,7 @@ export class CodexAgentSdkPlugin implements AgentPlugin { } } catch (sdkErr) { lastError = sdkErr instanceof Error ? sdkErr.message : String(sdkErr); + hadErrors = true; yield { type: "error" as const, message: `Codex SDK error: ${lastError.slice(0, 300)}`, @@ -690,6 +695,7 @@ export class CodexAgentSdkPlugin implements AgentPlugin { const wasSilent = (sdkMeta.usage?.outputTokens ?? 0) === 0; const stderrTail = wasSilent ? readStderrTail(invocation.stderrLog) : undefined; if (wasSilent && stderrTail) { + hadErrors = true; yield { type: "error" as const, message: `Codex silent-exit stderr: ${stderrTail.slice(0, 1500)}`, @@ -731,6 +737,7 @@ export class CodexAgentSdkPlugin implements AgentPlugin { durationMs, ...sdkMeta, refusal, + hadErrors, }, }; } diff --git a/packages/processor/src/agents/types.ts b/packages/processor/src/agents/types.ts index 0e793a5..09a5cef 100644 --- a/packages/processor/src/agents/types.ts +++ b/packages/processor/src/agents/types.ts @@ -39,6 +39,11 @@ export interface BatchMeta { * Empty/undefined for non-codex backends and successful codex runs. */ codexStderr?: string; + /** + * True when the agent backend emitted one or more runtime errors while + * handling the batch (quota/rate-limit/provider failures etc). + */ + hadErrors?: boolean; } export interface InvestigateOutput { diff --git a/packages/processor/src/index.ts b/packages/processor/src/index.ts index 6f83f65..0919bc6 100644 --- a/packages/processor/src/index.ts +++ b/packages/processor/src/index.ts @@ -200,7 +200,13 @@ export async function process(params: { /** Skip files whose candidate slugs are ALL in this set (files with any other slug still get processed) */ skipSlugs?: string[]; onProgress?: (progress: ProcessProgress) => void; -}): Promise<{ runId: string; analysisCount: number; findingCount: number }> { +}): Promise<{ + runId: string; + analysisCount: number; + findingCount: number; + failedCount: number; + phase: "done" | "partial"; +}> { const { projectId, agentType = "claude-agent-sdk", @@ -282,7 +288,7 @@ export async function process(params: { type: "all_complete", message: `Run ${runId} already completed`, }); - return { runId, analysisCount: 0, findingCount: 0 }; + return { runId, analysisCount: 0, findingCount: 0, failedCount: 0, phase: "done" }; } } else { // Create new run @@ -385,7 +391,7 @@ export async function process(params: { message: "No files to process", }); completeRun(projectId, runId, "done", { filesProcessed: 0 }); - return { runId, analysisCount: 0, findingCount: 0 }; + return { runId, analysisCount: 0, findingCount: 0, failedCount: 0, phase: "done" }; } // Apply path filter @@ -412,8 +418,10 @@ export async function process(params: { let totalInputTokens = 0; let totalOutputTokens = 0; let totalDurationMs = 0; + let totalFailedAnalyses = 0; let batchesCompleted = 0; let batchesInFlight = 0; + let hadPartialFailures = false; const concurrency = params.concurrency ?? defaultConcurrency(); async function processBatch(batch: FileRecord[], i: number) { @@ -448,6 +456,8 @@ export async function process(params: { const output = result.value as InvestigateOutput; const { results, meta: batchMeta } = output; + const zeroOutput = (batchMeta.usage?.outputTokens ?? 0) === 0; + const degradedBatch = Boolean(batchMeta.hadErrors && zeroOutput); // Accumulate run-level stats totalCostUsd += batchMeta.costUsd ?? 0; @@ -455,6 +465,28 @@ export async function process(params: { totalOutputTokens += batchMeta.usage?.outputTokens ?? 0; totalDurationMs += batchMeta.durationMs; + // If the agent reported runtime failures and produced zero output + // tokens, treat this batch as retryable failure instead of a clean + // "analyzed with 0 findings" pass. + if (degradedBatch) { + hadPartialFailures = true; + for (const record of batch) { + record.status = "error"; + record.lockedByRunId = undefined; + writeFileRecord(record); + totalFailedAnalyses++; + } + batchesInFlight--; + batchesCompleted++; + emitProgress({ + type: "batch_complete", + message: `Batch ${i + 1}/${batches.length} partial: agent error with empty output; marked ${batch.length} file(s) retryable (${batchesInFlight} in flight, ${batchesCompleted}/${batches.length} done)`, + batchIndex: i, + totalBatches: batches.length, + }); + return; + } + // Update file records with results + metadata. // // Re-investigation always *merges* — existing findings are preserved @@ -512,6 +544,8 @@ export async function process(params: { record.status = "error"; record.lockedByRunId = undefined; writeFileRecord(record); + totalFailedAnalyses++; + hadPartialFailures = true; } } @@ -530,7 +564,9 @@ export async function process(params: { record.status = "error"; record.lockedByRunId = undefined; writeFileRecord(record); + totalFailedAnalyses++; } + hadPartialFailures = true; emitProgress({ type: "batch_complete", message: `Batch ${i + 1}/${batches.length} failed: ${err instanceof Error ? err.message : String(err)} (${batchesInFlight} in flight, ${batchesCompleted}/${batches.length} done)`, @@ -558,8 +594,10 @@ export async function process(params: { await Promise.all(workers); } - completeRun(projectId, runId, "done", { + const finalPhase = hadPartialFailures ? "partial" : "done"; + completeRun(projectId, runId, finalPhase, { filesProcessed: totalAnalyses, + filesFailed: totalFailedAnalyses, findingsCount: totalFindings, totalCostUsd, totalInputTokens, @@ -569,10 +607,16 @@ export async function process(params: { emitProgress({ type: "all_complete", - message: `Processing complete: ${totalAnalyses} analyses, ${totalFindings} findings`, + message: `Processing complete: ${totalAnalyses} analyses, ${totalFindings} findings${totalFailedAnalyses > 0 ? `, ${totalFailedAnalyses} failed/retryable` : ""}`, }); - return { runId, analysisCount: totalAnalyses, findingCount: totalFindings }; + return { + runId, + analysisCount: totalAnalyses, + findingCount: totalFindings, + failedCount: totalFailedAnalyses, + phase: finalPhase, + }; } // --- Revalidation ---