Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions src/__tests__/rate-limit-coordinator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/**
* Unit tests for Issue #3931: Rate-limit coordinator.
*
* Verifies cross-session coordination of rate-limit retries:
* - Concurrency limiting
* - Queue behavior
* - Stagger delay
* - Dequeue on session kill
*/
import { describe, expect, it, vi } from 'vitest';
import { RateLimitCoordinator } from '../rate-limit-coordinator.js';

describe('Issue #3931: RateLimitCoordinator', () => {
it('allows acquisition when under concurrency limit', async () => {
const coord = new RateLimitCoordinator({ maxConcurrent: 2, staggerMs: 0 });
await coord.acquire('session-1');
expect(coord.active).toBe(1);
coord.release('session-1');
expect(coord.active).toBe(0);
});

it('queues when at concurrency limit', async () => {
const coord = new RateLimitCoordinator({ maxConcurrent: 1, staggerMs: 0 });

// First session acquires immediately
await coord.acquire('session-1');
expect(coord.active).toBe(1);

// Second session should be queued
const acquirePromise = coord.acquire('session-2');
expect(coord.queueDepth).toBe(1);

// Release first — second should proceed
coord.release('session-1');
await acquirePromise;
expect(coord.active).toBe(1);
expect(coord.queueDepth).toBe(0);

coord.release('session-2');
});

it('processes queue in FIFO order', async () => {
const coord = new RateLimitCoordinator({ maxConcurrent: 1, staggerMs: 0 });
const order: string[] = [];

await coord.acquire('s1');

const p2 = coord.acquire('s2').then(() => order.push('s2'));
const p3 = coord.acquire('s3').then(() => order.push('s3'));

coord.release('s1');
await p2;
coord.release('s2');
await p3;

expect(order).toEqual(['s2', 's3']);
coord.release('s3');
});

it('removes session from queue via dequeue', async () => {
const coord = new RateLimitCoordinator({ maxConcurrent: 1, staggerMs: 0 });

await coord.acquire('s1');
const p2 = coord.acquire('s2');
const p3 = coord.acquire('s3');

expect(coord.queueDepth).toBe(2);

// Kill session-2 while queued
coord.dequeue('s2');
expect(coord.queueDepth).toBe(1);

// Release s1 → s3 should proceed (s2 was dequeued)
coord.release('s1');
await p3;
expect(coord.active).toBe(1);

coord.release('s3');
});

it('respects stagger delay between releases and next acquisition', async () => {
vi.useFakeTimers();
const coord = new RateLimitCoordinator({ maxConcurrent: 1, staggerMs: 100 });

await coord.acquire('s1');

const p2 = coord.acquire('s2');

// Release s1 — s2 should start after stagger
coord.release('s1');

// At this point, s2's resolve is scheduled in a setTimeout
// Advance timers to trigger it
vi.advanceTimersByTime(100);
await p2;

expect(coord.active).toBe(1);
coord.release('s2');

vi.useRealTimers();
});

it('tracks active count correctly', async () => {
const coord = new RateLimitCoordinator({ maxConcurrent: 3, staggerMs: 0 });

await coord.acquire('s1');
await coord.acquire('s2');
await coord.acquire('s3');
expect(coord.active).toBe(3);

coord.release('s1');
expect(coord.active).toBe(2);

coord.release('s2');
coord.release('s3');
expect(coord.active).toBe(0);
});

it('defaults to maxConcurrent=1 and staggerMs=2000', () => {
const coord = new RateLimitCoordinator();
// Verify by observing queuing behavior (maxConcurrent=1)
expect(coord.active).toBe(0);
expect(coord.queueDepth).toBe(0);
});

it('handles release when no sessions active', () => {
const coord = new RateLimitCoordinator({ maxConcurrent: 1, staggerMs: 0 });
// Should not throw
coord.release('nonexistent');
expect(coord.active).toBe(0);
});
});
77 changes: 45 additions & 32 deletions src/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { type MetricsCollector } from './metrics.js';
import { startToolSpan, setToolResult, spanOk } from './tracing.js';
import type { Span } from '@opentelemetry/api';
import { computeDelayMs, retryWithJitter } from './retry.js';
import { RateLimitCoordinator } from './rate-limit-coordinator.js';

/** Stub: parse "Cogitated for Xm Ys" from status text. Returns duration in ms or null. */
function parseCogitatedDuration(_statusText: string): number | null {
Expand Down Expand Up @@ -121,6 +122,8 @@ export class SessionMonitor {
private deadNotified = new Set<string>(); // don't spam dead session events
private prevStatusForStall = new Map<string, UIState>(); // track previous status for stall transition detection
private rateLimitedSessions = new Set<string>(); // sessions in rate-limit backoff
/** Issue #3931: Cross-session rate-limit retry coordinator. */
private rateLimitCoordinator = new RateLimitCoordinator();
// Issue #1324: Track statusText per session to detect extended thinking ("Cogitated for Xm Ys")
private lastStatusText = new Map<string, string | null>();
/** Thinking stall threshold multiplier — CC extended thinking gets 5x the normal stall threshold. */
Expand Down Expand Up @@ -608,49 +611,57 @@ export class SessionMonitor {
sessionId: session.id,
attributes: { attempt: retryAttempt, maxRetries, delayMs, stopReason },
});
// Fire-and-forget retry after delay (non-blocking to main monitor loop)
// Issue #3931: Coordinate retry with other sessions to avoid concurrent
// rate-limit retries that amplify the problem. Acquire a slot, wait for
// the backoff delay, then restart. Release the slot when done.
const backend = this.acpBackend;
const sid = session.id;
const cwd = session.workDir;
const tenantId = session.tenantId ?? SYSTEM_TENANT;
const ownerKeyId = session.ownerKeyId ?? 'master';
setTimeout(() => {
backend.restartSession({
const coordinator = this.rateLimitCoordinator;
// Fire-and-forget: acquire slot → delay → restart → release
coordinator.acquire(sid).then(() => {
return new Promise<void>((resolve) => setTimeout(resolve, delayMs));
}).then(() => {
return backend.restartSession({
sessionId: sid,
cwd,
tenantId,
ownerKeyId,
reason: `rate_limit_retry_${retryAttempt}`,
}).then((result) => {
logger.info({
component: 'monitor',
operation: 'rate_limit_retry_success',
sessionId: sid,
attributes: { attempt: retryAttempt, backoffDelayMs: result.backoffDelayMs },
});
this.rateLimitedSessions.delete(sid);
}).catch((err: unknown) => {
const errMsg = err instanceof Error ? err.message : String(err);
logger.error({
component: 'monitor',
operation: 'rate_limit_retry_failed',
sessionId: sid,
errorCode: 'RATE_LIMIT_RETRY_ERROR',
attributes: { attempt: retryAttempt, error: errMsg },
});
// If this was the last attempt, notify the user and clean up
if (retryAttempt >= maxRetries) {
this.rateLimitRetryAttempts.delete(sid);
this.channels.statusChange(
this.makePayload('status.error', { ...session, status: 'error' } as SessionInfo,
`Rate-limit retry exhausted (${maxRetries}/${maxRetries}). Session requires manual intervention.`),
);
this.alertManager?.recordFailure('session_failure',
`Session "${session.displayName}" rate-limit retries exhausted: ${errMsg}`);
this.metrics?.sessionFailed(sid);
}
});
}, delayMs);
}).then((result) => {
logger.info({
component: 'monitor',
operation: 'rate_limit_retry_success',
sessionId: sid,
attributes: { attempt: retryAttempt, backoffDelayMs: result.backoffDelayMs },
});
this.rateLimitedSessions.delete(sid);
coordinator.release(sid);
}).catch((err: unknown) => {
const errMsg = err instanceof Error ? err.message : String(err);
logger.error({
component: 'monitor',
operation: 'rate_limit_retry_failed',
sessionId: sid,
errorCode: 'RATE_LIMIT_RETRY_ERROR',
attributes: { attempt: retryAttempt, error: errMsg },
});
coordinator.release(sid);
// If this was the last attempt, notify the user and clean up
if (retryAttempt >= maxRetries) {
this.rateLimitRetryAttempts.delete(sid);
this.channels.statusChange(
this.makePayload('status.error', { ...session, status: 'error' } as SessionInfo,
`Rate-limit retry exhausted (${maxRetries}/${maxRetries}). Session requires manual intervention.`),
);
this.alertManager?.recordFailure('session_failure',
`Session "${session.displayName}" rate-limit retries exhausted: ${errMsg}`);
this.metrics?.sessionFailed(sid);
}
});
} else if (!this.acpBackend) {
// No ACP backend available — legacy notification only
this.channels.statusChange(
Expand Down Expand Up @@ -1120,6 +1131,8 @@ export class SessionMonitor {
this.rateLimitedSessions.delete(sessionId);
// Issue #3754: Clear retry tracking
this.rateLimitRetryAttempts.delete(sessionId);
// Issue #3931: Remove from rate-limit coordinator queue.
this.rateLimitCoordinator.dequeue(sessionId);
this.stallRecovering.delete(sessionId);
// Issue #89 L4: Clear pending debounce timer
const pending = this.statusChangeDebounce.get(sessionId);
Expand Down
124 changes: 124 additions & 0 deletions src/rate-limit-coordinator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/**
* rate-limit-coordinator.ts — Cross-session rate-limit coordination.
*
* Issue #3931: When multiple sessions hit the CC rate limit independently,
* their retries fire concurrently and amplify the problem. This coordinator
* serializes rate-limit retries across sessions with a configurable
* concurrency cap and stagger delay.
*
* Usage: the monitor calls `acquire()` before starting a rate-limit retry.
* If the concurrency cap is reached, the session queues until a slot opens.
*/

import { logger } from './logger.js';

export interface RateLimitCoordinatorOptions {
/** Max concurrent rate-limit retries across all sessions (default: 1). */
maxConcurrent?: number;
/** Minimum stagger delay between consecutive retry starts in ms (default: 2000). */
staggerMs?: number;
}

interface QueueEntry {
sessionId: string;
resolve: () => void;
}

const DEFAULT_MAX_CONCURRENT = 1;
const DEFAULT_STAGGER_MS = 2_000;

export class RateLimitCoordinator {
private readonly maxConcurrent: number;
private readonly staggerMs: number;
private activeCount = 0;
private lastReleaseAt = 0;
private queue: QueueEntry[] = [];

constructor(options: RateLimitCoordinatorOptions = {}) {
this.maxConcurrent = options.maxConcurrent ?? DEFAULT_MAX_CONCURRENT;
this.staggerMs = options.staggerMs ?? DEFAULT_STAGGER_MS;
}

/**
* Acquire a rate-limit retry slot. Returns when a slot is available.
* If the concurrency cap is reached, queues until a slot opens.
* Includes stagger delay so retries don't all fire simultaneously.
*/
async acquire(sessionId: string): Promise<void> {
if (this.activeCount < this.maxConcurrent) {
this.activeCount++;
logger.info({
component: 'rate-limit-coordinator',
operation: 'acquire',
sessionId,
attributes: { activeCount: this.activeCount, maxConcurrent: this.maxConcurrent },
});
return;
}

// Queue the session
logger.info({
component: 'rate-limit-coordinator',
operation: 'queued',
sessionId,
attributes: { queueDepth: this.queue.length, activeCount: this.activeCount },
});

return new Promise<void>((resolve) => {
this.queue.push({ sessionId, resolve });
});
}

/**
* Release a rate-limit retry slot. Triggers the next queued session
* after the stagger delay.
*/
release(sessionId: string): void {
if (this.activeCount > 0) {
this.activeCount--;
}
this.lastReleaseAt = Date.now();

logger.info({
component: 'rate-limit-coordinator',
operation: 'release',
sessionId,
attributes: { activeCount: this.activeCount, queueDepth: this.queue.length },
});

if (this.queue.length > 0) {
const next = this.queue.shift()!;
// Stagger: wait at least staggerMs from the last release
const elapsed = Date.now() - this.lastReleaseAt;
const waitMs = Math.max(0, this.staggerMs - elapsed);
setTimeout(() => {
this.activeCount++;
next.resolve();
logger.info({
component: 'rate-limit-coordinator',
operation: 'dequeue',
sessionId: next.sessionId,
attributes: { activeCount: this.activeCount, waitedMs: waitMs },
});
}, waitMs);
}
}

/** Get current queue depth (for diagnostics/metrics). */
get queueDepth(): number {
return this.queue.length;
}

/** Get number of active retry slots in use. */
get active(): number {
return this.activeCount;
}

/** Remove a session from the queue (e.g. if session is killed while waiting). */
dequeue(sessionId: string): void {
const idx = this.queue.findIndex(e => e.sessionId === sessionId);
if (idx >= 0) {
this.queue.splice(idx, 1);
}
}
}
Loading