Skip to content

Commit 693a3d3

Browse files
author
Theodore Li
committed
Add user based hosted key throttling
1 parent 2325535 commit 693a3d3

File tree

12 files changed

+594
-42
lines changed

12 files changed

+594
-42
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
export {
2+
getHostedKeyThrottler,
3+
HostedKeyThrottler,
4+
resetHostedKeyThrottler,
5+
} from './throttler'
6+
export {
7+
DEFAULT_BURST_MULTIPLIER,
8+
THROTTLE_WINDOW_MS,
9+
toTokenBucketConfig,
10+
type AcquireKeyResult,
11+
type CustomThrottle,
12+
type PerRequestThrottle,
13+
type ThrottleConfig,
14+
type ThrottleDimension,
15+
type ThrottleMode,
16+
} from './types'
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import { loggerMock } from '@sim/testing'
2+
import { afterEach, beforeEach, describe, expect, it, type Mock, vi } from 'vitest'
3+
import { HostedKeyThrottler } from './throttler'
4+
import type { PerRequestThrottle } from './types'
5+
import type { ConsumeResult, RateLimitStorageAdapter } from '@/lib/core/rate-limiter/storage'
6+
7+
vi.mock('@sim/logger', () => loggerMock)
8+
9+
interface MockAdapter {
10+
consumeTokens: Mock
11+
getTokenStatus: Mock
12+
resetBucket: Mock
13+
}
14+
15+
const createMockAdapter = (): MockAdapter => ({
16+
consumeTokens: vi.fn(),
17+
getTokenStatus: vi.fn(),
18+
resetBucket: vi.fn(),
19+
})
20+
21+
describe('HostedKeyThrottler', () => {
22+
const testProvider = 'exa'
23+
const envKeys = ['EXA_API_KEY_1', 'EXA_API_KEY_2', 'EXA_API_KEY_3']
24+
let mockAdapter: MockAdapter
25+
let throttler: HostedKeyThrottler
26+
let originalEnv: NodeJS.ProcessEnv
27+
28+
const perRequestThrottle: PerRequestThrottle = {
29+
mode: 'per_request',
30+
userRequestsPerMinute: 10,
31+
}
32+
33+
beforeEach(() => {
34+
vi.clearAllMocks()
35+
mockAdapter = createMockAdapter()
36+
throttler = new HostedKeyThrottler(mockAdapter as RateLimitStorageAdapter)
37+
38+
originalEnv = { ...process.env }
39+
process.env.EXA_API_KEY_1 = 'test-key-1'
40+
process.env.EXA_API_KEY_2 = 'test-key-2'
41+
process.env.EXA_API_KEY_3 = 'test-key-3'
42+
})
43+
44+
afterEach(() => {
45+
process.env = originalEnv
46+
})
47+
48+
describe('acquireKey', () => {
49+
it('should return error when no keys are configured', async () => {
50+
delete process.env.EXA_API_KEY_1
51+
delete process.env.EXA_API_KEY_2
52+
delete process.env.EXA_API_KEY_3
53+
54+
const result = await throttler.acquireKey(testProvider, envKeys, perRequestThrottle)
55+
56+
expect(result.success).toBe(false)
57+
expect(result.error).toContain('No hosted keys configured')
58+
})
59+
60+
it('should throttle user when they exceed their rate limit', async () => {
61+
const throttledResult: ConsumeResult = {
62+
allowed: false,
63+
tokensRemaining: 0,
64+
resetAt: new Date(Date.now() + 30000),
65+
}
66+
mockAdapter.consumeTokens.mockResolvedValue(throttledResult)
67+
68+
const result = await throttler.acquireKey(testProvider, envKeys, perRequestThrottle, 'user-123')
69+
70+
expect(result.success).toBe(false)
71+
expect(result.userThrottled).toBe(true)
72+
expect(result.retryAfterMs).toBeDefined()
73+
expect(result.error).toContain('Rate limit exceeded')
74+
})
75+
76+
it('should allow user within their rate limit', async () => {
77+
const allowedResult: ConsumeResult = {
78+
allowed: true,
79+
tokensRemaining: 9,
80+
resetAt: new Date(Date.now() + 60000),
81+
}
82+
mockAdapter.consumeTokens.mockResolvedValue(allowedResult)
83+
84+
const result = await throttler.acquireKey(testProvider, envKeys, perRequestThrottle, 'user-123')
85+
86+
expect(result.success).toBe(true)
87+
expect(result.userThrottled).toBeUndefined()
88+
expect(result.key).toBe('test-key-1')
89+
})
90+
91+
it('should distribute requests across keys round-robin style', async () => {
92+
const allowedResult: ConsumeResult = {
93+
allowed: true,
94+
tokensRemaining: 9,
95+
resetAt: new Date(Date.now() + 60000),
96+
}
97+
mockAdapter.consumeTokens.mockResolvedValue(allowedResult)
98+
99+
const r1 = await throttler.acquireKey(testProvider, envKeys, perRequestThrottle, 'user-1')
100+
const r2 = await throttler.acquireKey(testProvider, envKeys, perRequestThrottle, 'user-2')
101+
const r3 = await throttler.acquireKey(testProvider, envKeys, perRequestThrottle, 'user-3')
102+
const r4 = await throttler.acquireKey(testProvider, envKeys, perRequestThrottle, 'user-4')
103+
104+
expect(r1.keyIndex).toBe(0)
105+
expect(r2.keyIndex).toBe(1)
106+
expect(r3.keyIndex).toBe(2)
107+
expect(r4.keyIndex).toBe(0) // Wraps back
108+
})
109+
110+
it('should work without userId (no per-user throttling)', async () => {
111+
const result = await throttler.acquireKey(testProvider, envKeys, perRequestThrottle)
112+
113+
expect(result.success).toBe(true)
114+
expect(result.key).toBe('test-key-1')
115+
expect(mockAdapter.consumeTokens).not.toHaveBeenCalled()
116+
})
117+
118+
it('should handle partial key availability', async () => {
119+
delete process.env.EXA_API_KEY_2
120+
121+
const result = await throttler.acquireKey(testProvider, envKeys, perRequestThrottle)
122+
123+
expect(result.success).toBe(true)
124+
expect(result.key).toBe('test-key-1')
125+
expect(result.envVarName).toBe('EXA_API_KEY_1')
126+
127+
const r2 = await throttler.acquireKey(testProvider, envKeys, perRequestThrottle)
128+
expect(r2.keyIndex).toBe(2) // Skips missing key 1
129+
expect(r2.envVarName).toBe('EXA_API_KEY_3')
130+
})
131+
})
132+
})
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
import { createLogger } from '@sim/logger'
2+
import {
3+
createStorageAdapter,
4+
type RateLimitStorageAdapter,
5+
type TokenBucketConfig,
6+
} from '@/lib/core/rate-limiter/storage'
7+
import {
8+
DEFAULT_BURST_MULTIPLIER,
9+
THROTTLE_WINDOW_MS,
10+
toTokenBucketConfig,
11+
type AcquireKeyResult,
12+
type PerRequestThrottle,
13+
type ThrottleConfig,
14+
} from './types'
15+
16+
const logger = createLogger('HostedKeyThrottler')
17+
18+
/** Dimension name for per-user rate limiting */
19+
const USER_REQUESTS_DIMENSION = 'user_requests'
20+
21+
/**
22+
* Information about an available hosted key
23+
*/
24+
interface AvailableKey {
25+
key: string
26+
keyIndex: number
27+
envVarName: string
28+
}
29+
30+
/**
31+
* HostedKeyThrottler provides:
32+
* 1. Per-user rate limiting (enforced - blocks users who exceed their limit)
33+
* 2. Least-loaded key selection (distributes requests evenly across keys)
34+
*/
35+
export class HostedKeyThrottler {
36+
private storage: RateLimitStorageAdapter
37+
/** In-memory request counters per key: "provider:keyIndex" -> count */
38+
private keyRequestCounts = new Map<string, number>()
39+
40+
constructor(storage?: RateLimitStorageAdapter) {
41+
this.storage = storage ?? createStorageAdapter()
42+
}
43+
44+
/**
45+
* Build storage key for per-user rate limiting
46+
*/
47+
private buildUserStorageKey(provider: string, userId: string): string {
48+
return `hosted:${provider}:user:${userId}:${USER_REQUESTS_DIMENSION}`
49+
}
50+
51+
/**
52+
* Get available keys from environment variables
53+
*/
54+
private getAvailableKeys(envKeys: string[]): AvailableKey[] {
55+
const keys: AvailableKey[] = []
56+
for (let i = 0; i < envKeys.length; i++) {
57+
const envVarName = envKeys[i]
58+
const key = process.env[envVarName]
59+
if (key) {
60+
keys.push({ key, keyIndex: i, envVarName })
61+
}
62+
}
63+
return keys
64+
}
65+
66+
/**
67+
* Get user rate limit config from throttle config
68+
*/
69+
private getUserRateLimitConfig(throttle: ThrottleConfig): TokenBucketConfig | null {
70+
if (throttle.mode !== 'per_request' || !throttle.userRequestsPerMinute) {
71+
return null
72+
}
73+
return toTokenBucketConfig(
74+
throttle.userRequestsPerMinute,
75+
throttle.burstMultiplier ?? DEFAULT_BURST_MULTIPLIER,
76+
THROTTLE_WINDOW_MS
77+
)
78+
}
79+
80+
/**
81+
* Check and consume user rate limit. Returns null if allowed, or retry info if throttled.
82+
*/
83+
private async checkUserRateLimit(
84+
provider: string,
85+
userId: string,
86+
throttle: ThrottleConfig
87+
): Promise<{ throttled: true; retryAfterMs: number } | null> {
88+
const config = this.getUserRateLimitConfig(throttle)
89+
if (!config) return null
90+
91+
const storageKey = this.buildUserStorageKey(provider, userId)
92+
93+
try {
94+
const result = await this.storage.consumeTokens(storageKey, 1, config)
95+
if (!result.allowed) {
96+
const retryAfterMs = Math.max(0, result.resetAt.getTime() - Date.now())
97+
logger.info(`User ${userId} throttled for ${provider}`, {
98+
provider,
99+
userId,
100+
retryAfterMs,
101+
tokensRemaining: result.tokensRemaining,
102+
})
103+
return { throttled: true, retryAfterMs }
104+
}
105+
return null
106+
} catch (error) {
107+
logger.error(`Error checking user rate limit for ${provider}`, { error, userId })
108+
return null // Allow on error
109+
}
110+
}
111+
112+
/**
113+
* Acquire the best available key.
114+
*
115+
* 1. Per-user throttling (enforced): Users exceeding their limit get blocked
116+
* 2. Least-loaded key selection: Picks the key with fewest requests
117+
*/
118+
async acquireKey(
119+
provider: string,
120+
envKeys: string[],
121+
throttle: ThrottleConfig,
122+
userId?: string
123+
): Promise<AcquireKeyResult> {
124+
if (userId && throttle.mode === 'per_request' && throttle.userRequestsPerMinute) {
125+
const userThrottleResult = await this.checkUserRateLimit(provider, userId, throttle)
126+
if (userThrottleResult) {
127+
return {
128+
success: false,
129+
userThrottled: true,
130+
retryAfterMs: userThrottleResult.retryAfterMs,
131+
error: `Rate limit exceeded. Please wait ${Math.ceil(userThrottleResult.retryAfterMs / 1000)} seconds.`,
132+
}
133+
}
134+
}
135+
136+
const availableKeys = this.getAvailableKeys(envKeys)
137+
138+
if (availableKeys.length === 0) {
139+
logger.warn(`No hosted keys configured for provider ${provider}`)
140+
return {
141+
success: false,
142+
error: `No hosted keys configured for ${provider}`,
143+
}
144+
}
145+
146+
// Select the key with fewest requests
147+
let leastLoaded = availableKeys[0]
148+
let minCount = this.getKeyCount(provider, leastLoaded.keyIndex)
149+
150+
for (let i = 1; i < availableKeys.length; i++) {
151+
const count = this.getKeyCount(provider, availableKeys[i].keyIndex)
152+
if (count < minCount) {
153+
minCount = count
154+
leastLoaded = availableKeys[i]
155+
}
156+
}
157+
158+
this.incrementKeyCount(provider, leastLoaded.keyIndex)
159+
160+
logger.debug(`Selected hosted key for ${provider}`, {
161+
provider,
162+
keyIndex: leastLoaded.keyIndex,
163+
envVarName: leastLoaded.envVarName,
164+
requestCount: minCount + 1,
165+
})
166+
167+
return {
168+
success: true,
169+
key: leastLoaded.key,
170+
keyIndex: leastLoaded.keyIndex,
171+
envVarName: leastLoaded.envVarName,
172+
}
173+
}
174+
175+
private getKeyCount(provider: string, keyIndex: number): number {
176+
return this.keyRequestCounts.get(`${provider}:${keyIndex}`) ?? 0
177+
}
178+
179+
private incrementKeyCount(provider: string, keyIndex: number): void {
180+
const key = `${provider}:${keyIndex}`
181+
this.keyRequestCounts.set(key, (this.keyRequestCounts.get(key) ?? 0) + 1)
182+
}
183+
}
184+
185+
let cachedThrottler: HostedKeyThrottler | null = null
186+
187+
/**
188+
* Get the singleton HostedKeyThrottler instance
189+
*/
190+
export function getHostedKeyThrottler(): HostedKeyThrottler {
191+
if (!cachedThrottler) {
192+
cachedThrottler = new HostedKeyThrottler()
193+
}
194+
return cachedThrottler
195+
}
196+
197+
/**
198+
* Reset the cached throttler (for testing)
199+
*/
200+
export function resetHostedKeyThrottler(): void {
201+
cachedThrottler = null
202+
}

0 commit comments

Comments
 (0)