From 64eb29556c802e07aa21c4d3ca064953682ed787 Mon Sep 17 00:00:00 2001 From: Hephaestus Date: Sat, 23 May 2026 07:04:33 +0200 Subject: [PATCH] fix(monitor): cross-session rate-limit retry coordination (#3931) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Parallel CC sessions hitting rate limits independently amplify the problem by retrying concurrently. Add RateLimitCoordinator to serialize retries: - Concurrency cap (default: 1) limits simultaneous rate-limit retries - Stagger delay (default: 2s) spaces consecutive retry starts - Queued sessions wait for a slot before retrying - Sessions killed while queued are dequeued via removeSession hook New files: - src/rate-limit-coordinator.ts — semaphore-style coordinator - src/__tests__/rate-limit-coordinator.test.ts — 8 tests Modified: - src/monitor.ts — handleRateLimitSignal uses coordinator.acquire/release --- src/__tests__/rate-limit-coordinator.test.ts | 132 +++++++++++++++++++ src/monitor.ts | 77 ++++++----- src/rate-limit-coordinator.ts | 124 +++++++++++++++++ 3 files changed, 301 insertions(+), 32 deletions(-) create mode 100644 src/__tests__/rate-limit-coordinator.test.ts create mode 100644 src/rate-limit-coordinator.ts diff --git a/src/__tests__/rate-limit-coordinator.test.ts b/src/__tests__/rate-limit-coordinator.test.ts new file mode 100644 index 00000000..0814551d --- /dev/null +++ b/src/__tests__/rate-limit-coordinator.test.ts @@ -0,0 +1,132 @@ +/** + * Unit tests for Issue #3931: Rate-limit coordinator. + * + * Verifies cross-session coordination of rate-limit retries: + * - Concurrency limiting + * - Queue behavior + * - Stagger delay + * - Dequeue on session kill + */ +import { describe, expect, it, vi } from 'vitest'; +import { RateLimitCoordinator } from '../rate-limit-coordinator.js'; + +describe('Issue #3931: RateLimitCoordinator', () => { + it('allows acquisition when under concurrency limit', async () => { + const coord = new RateLimitCoordinator({ maxConcurrent: 2, staggerMs: 0 }); + await coord.acquire('session-1'); + expect(coord.active).toBe(1); + coord.release('session-1'); + expect(coord.active).toBe(0); + }); + + it('queues when at concurrency limit', async () => { + const coord = new RateLimitCoordinator({ maxConcurrent: 1, staggerMs: 0 }); + + // First session acquires immediately + await coord.acquire('session-1'); + expect(coord.active).toBe(1); + + // Second session should be queued + const acquirePromise = coord.acquire('session-2'); + expect(coord.queueDepth).toBe(1); + + // Release first — second should proceed + coord.release('session-1'); + await acquirePromise; + expect(coord.active).toBe(1); + expect(coord.queueDepth).toBe(0); + + coord.release('session-2'); + }); + + it('processes queue in FIFO order', async () => { + const coord = new RateLimitCoordinator({ maxConcurrent: 1, staggerMs: 0 }); + const order: string[] = []; + + await coord.acquire('s1'); + + const p2 = coord.acquire('s2').then(() => order.push('s2')); + const p3 = coord.acquire('s3').then(() => order.push('s3')); + + coord.release('s1'); + await p2; + coord.release('s2'); + await p3; + + expect(order).toEqual(['s2', 's3']); + coord.release('s3'); + }); + + it('removes session from queue via dequeue', async () => { + const coord = new RateLimitCoordinator({ maxConcurrent: 1, staggerMs: 0 }); + + await coord.acquire('s1'); + const p2 = coord.acquire('s2'); + const p3 = coord.acquire('s3'); + + expect(coord.queueDepth).toBe(2); + + // Kill session-2 while queued + coord.dequeue('s2'); + expect(coord.queueDepth).toBe(1); + + // Release s1 → s3 should proceed (s2 was dequeued) + coord.release('s1'); + await p3; + expect(coord.active).toBe(1); + + coord.release('s3'); + }); + + it('respects stagger delay between releases and next acquisition', async () => { + vi.useFakeTimers(); + const coord = new RateLimitCoordinator({ maxConcurrent: 1, staggerMs: 100 }); + + await coord.acquire('s1'); + + const p2 = coord.acquire('s2'); + + // Release s1 — s2 should start after stagger + coord.release('s1'); + + // At this point, s2's resolve is scheduled in a setTimeout + // Advance timers to trigger it + vi.advanceTimersByTime(100); + await p2; + + expect(coord.active).toBe(1); + coord.release('s2'); + + vi.useRealTimers(); + }); + + it('tracks active count correctly', async () => { + const coord = new RateLimitCoordinator({ maxConcurrent: 3, staggerMs: 0 }); + + await coord.acquire('s1'); + await coord.acquire('s2'); + await coord.acquire('s3'); + expect(coord.active).toBe(3); + + coord.release('s1'); + expect(coord.active).toBe(2); + + coord.release('s2'); + coord.release('s3'); + expect(coord.active).toBe(0); + }); + + it('defaults to maxConcurrent=1 and staggerMs=2000', () => { + const coord = new RateLimitCoordinator(); + // Verify by observing queuing behavior (maxConcurrent=1) + expect(coord.active).toBe(0); + expect(coord.queueDepth).toBe(0); + }); + + it('handles release when no sessions active', () => { + const coord = new RateLimitCoordinator({ maxConcurrent: 1, staggerMs: 0 }); + // Should not throw + coord.release('nonexistent'); + expect(coord.active).toBe(0); + }); +}); diff --git a/src/monitor.ts b/src/monitor.ts index 27ec9e46..100f7c0a 100644 --- a/src/monitor.ts +++ b/src/monitor.ts @@ -29,6 +29,7 @@ import { type MetricsCollector } from './metrics.js'; import { startToolSpan, setToolResult, spanOk } from './tracing.js'; import type { Span } from '@opentelemetry/api'; import { computeDelayMs, retryWithJitter } from './retry.js'; +import { RateLimitCoordinator } from './rate-limit-coordinator.js'; /** Stub: parse "Cogitated for Xm Ys" from status text. Returns duration in ms or null. */ function parseCogitatedDuration(_statusText: string): number | null { @@ -121,6 +122,8 @@ export class SessionMonitor { private deadNotified = new Set(); // don't spam dead session events private prevStatusForStall = new Map(); // track previous status for stall transition detection private rateLimitedSessions = new Set(); // sessions in rate-limit backoff + /** Issue #3931: Cross-session rate-limit retry coordinator. */ + private rateLimitCoordinator = new RateLimitCoordinator(); // Issue #1324: Track statusText per session to detect extended thinking ("Cogitated for Xm Ys") private lastStatusText = new Map(); /** Thinking stall threshold multiplier — CC extended thinking gets 5x the normal stall threshold. */ @@ -608,49 +611,57 @@ export class SessionMonitor { sessionId: session.id, attributes: { attempt: retryAttempt, maxRetries, delayMs, stopReason }, }); - // Fire-and-forget retry after delay (non-blocking to main monitor loop) + // Issue #3931: Coordinate retry with other sessions to avoid concurrent + // rate-limit retries that amplify the problem. Acquire a slot, wait for + // the backoff delay, then restart. Release the slot when done. const backend = this.acpBackend; const sid = session.id; const cwd = session.workDir; const tenantId = session.tenantId ?? SYSTEM_TENANT; const ownerKeyId = session.ownerKeyId ?? 'master'; - setTimeout(() => { - backend.restartSession({ + const coordinator = this.rateLimitCoordinator; + // Fire-and-forget: acquire slot → delay → restart → release + coordinator.acquire(sid).then(() => { + return new Promise((resolve) => setTimeout(resolve, delayMs)); + }).then(() => { + return backend.restartSession({ sessionId: sid, cwd, tenantId, ownerKeyId, reason: `rate_limit_retry_${retryAttempt}`, - }).then((result) => { - logger.info({ - component: 'monitor', - operation: 'rate_limit_retry_success', - sessionId: sid, - attributes: { attempt: retryAttempt, backoffDelayMs: result.backoffDelayMs }, - }); - this.rateLimitedSessions.delete(sid); - }).catch((err: unknown) => { - const errMsg = err instanceof Error ? err.message : String(err); - logger.error({ - component: 'monitor', - operation: 'rate_limit_retry_failed', - sessionId: sid, - errorCode: 'RATE_LIMIT_RETRY_ERROR', - attributes: { attempt: retryAttempt, error: errMsg }, - }); - // If this was the last attempt, notify the user and clean up - if (retryAttempt >= maxRetries) { - this.rateLimitRetryAttempts.delete(sid); - this.channels.statusChange( - this.makePayload('status.error', { ...session, status: 'error' } as SessionInfo, - `Rate-limit retry exhausted (${maxRetries}/${maxRetries}). Session requires manual intervention.`), - ); - this.alertManager?.recordFailure('session_failure', - `Session "${session.displayName}" rate-limit retries exhausted: ${errMsg}`); - this.metrics?.sessionFailed(sid); - } }); - }, delayMs); + }).then((result) => { + logger.info({ + component: 'monitor', + operation: 'rate_limit_retry_success', + sessionId: sid, + attributes: { attempt: retryAttempt, backoffDelayMs: result.backoffDelayMs }, + }); + this.rateLimitedSessions.delete(sid); + coordinator.release(sid); + }).catch((err: unknown) => { + const errMsg = err instanceof Error ? err.message : String(err); + logger.error({ + component: 'monitor', + operation: 'rate_limit_retry_failed', + sessionId: sid, + errorCode: 'RATE_LIMIT_RETRY_ERROR', + attributes: { attempt: retryAttempt, error: errMsg }, + }); + coordinator.release(sid); + // If this was the last attempt, notify the user and clean up + if (retryAttempt >= maxRetries) { + this.rateLimitRetryAttempts.delete(sid); + this.channels.statusChange( + this.makePayload('status.error', { ...session, status: 'error' } as SessionInfo, + `Rate-limit retry exhausted (${maxRetries}/${maxRetries}). Session requires manual intervention.`), + ); + this.alertManager?.recordFailure('session_failure', + `Session "${session.displayName}" rate-limit retries exhausted: ${errMsg}`); + this.metrics?.sessionFailed(sid); + } + }); } else if (!this.acpBackend) { // No ACP backend available — legacy notification only this.channels.statusChange( @@ -1120,6 +1131,8 @@ export class SessionMonitor { this.rateLimitedSessions.delete(sessionId); // Issue #3754: Clear retry tracking this.rateLimitRetryAttempts.delete(sessionId); + // Issue #3931: Remove from rate-limit coordinator queue. + this.rateLimitCoordinator.dequeue(sessionId); this.stallRecovering.delete(sessionId); // Issue #89 L4: Clear pending debounce timer const pending = this.statusChangeDebounce.get(sessionId); diff --git a/src/rate-limit-coordinator.ts b/src/rate-limit-coordinator.ts new file mode 100644 index 00000000..095f154e --- /dev/null +++ b/src/rate-limit-coordinator.ts @@ -0,0 +1,124 @@ +/** + * rate-limit-coordinator.ts — Cross-session rate-limit coordination. + * + * Issue #3931: When multiple sessions hit the CC rate limit independently, + * their retries fire concurrently and amplify the problem. This coordinator + * serializes rate-limit retries across sessions with a configurable + * concurrency cap and stagger delay. + * + * Usage: the monitor calls `acquire()` before starting a rate-limit retry. + * If the concurrency cap is reached, the session queues until a slot opens. + */ + +import { logger } from './logger.js'; + +export interface RateLimitCoordinatorOptions { + /** Max concurrent rate-limit retries across all sessions (default: 1). */ + maxConcurrent?: number; + /** Minimum stagger delay between consecutive retry starts in ms (default: 2000). */ + staggerMs?: number; +} + +interface QueueEntry { + sessionId: string; + resolve: () => void; +} + +const DEFAULT_MAX_CONCURRENT = 1; +const DEFAULT_STAGGER_MS = 2_000; + +export class RateLimitCoordinator { + private readonly maxConcurrent: number; + private readonly staggerMs: number; + private activeCount = 0; + private lastReleaseAt = 0; + private queue: QueueEntry[] = []; + + constructor(options: RateLimitCoordinatorOptions = {}) { + this.maxConcurrent = options.maxConcurrent ?? DEFAULT_MAX_CONCURRENT; + this.staggerMs = options.staggerMs ?? DEFAULT_STAGGER_MS; + } + + /** + * Acquire a rate-limit retry slot. Returns when a slot is available. + * If the concurrency cap is reached, queues until a slot opens. + * Includes stagger delay so retries don't all fire simultaneously. + */ + async acquire(sessionId: string): Promise { + if (this.activeCount < this.maxConcurrent) { + this.activeCount++; + logger.info({ + component: 'rate-limit-coordinator', + operation: 'acquire', + sessionId, + attributes: { activeCount: this.activeCount, maxConcurrent: this.maxConcurrent }, + }); + return; + } + + // Queue the session + logger.info({ + component: 'rate-limit-coordinator', + operation: 'queued', + sessionId, + attributes: { queueDepth: this.queue.length, activeCount: this.activeCount }, + }); + + return new Promise((resolve) => { + this.queue.push({ sessionId, resolve }); + }); + } + + /** + * Release a rate-limit retry slot. Triggers the next queued session + * after the stagger delay. + */ + release(sessionId: string): void { + if (this.activeCount > 0) { + this.activeCount--; + } + this.lastReleaseAt = Date.now(); + + logger.info({ + component: 'rate-limit-coordinator', + operation: 'release', + sessionId, + attributes: { activeCount: this.activeCount, queueDepth: this.queue.length }, + }); + + if (this.queue.length > 0) { + const next = this.queue.shift()!; + // Stagger: wait at least staggerMs from the last release + const elapsed = Date.now() - this.lastReleaseAt; + const waitMs = Math.max(0, this.staggerMs - elapsed); + setTimeout(() => { + this.activeCount++; + next.resolve(); + logger.info({ + component: 'rate-limit-coordinator', + operation: 'dequeue', + sessionId: next.sessionId, + attributes: { activeCount: this.activeCount, waitedMs: waitMs }, + }); + }, waitMs); + } + } + + /** Get current queue depth (for diagnostics/metrics). */ + get queueDepth(): number { + return this.queue.length; + } + + /** Get number of active retry slots in use. */ + get active(): number { + return this.activeCount; + } + + /** Remove a session from the queue (e.g. if session is killed while waiting). */ + dequeue(sessionId: string): void { + const idx = this.queue.findIndex(e => e.sessionId === sessionId); + if (idx >= 0) { + this.queue.splice(idx, 1); + } + } +}