Skip to content

Commit df2dbb6

Browse files
feat(mcp): integrate resilience pipeline into McpService
This commit wires up the ResiliencePipeline (Telemetry, Circuit Breaker, Schema Validator) into the McpService executeTool logic. It also includes the previously missing context variables and updates the pipeline tests.
1 parent a94e66b commit df2dbb6

File tree

5 files changed

+580
-1
lines changed

5 files changed

+580
-1
lines changed
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types'
2+
import type { McpToolResult } from '@/lib/mcp/types'
3+
import { createLogger } from '@sim/logger'
4+
5+
// Configure standard cache size limit
6+
const MAX_SERVER_STATES = 1000
7+
8+
export type CircuitState = 'CLOSED' | 'OPEN' | 'HALF-OPEN'
9+
10+
export interface CircuitBreakerConfig {
11+
/** Number of failures before tripping to OPEN */
12+
failureThreshold: number
13+
/** How long to wait in OPEN before transitioning to HALF-OPEN (ms) */
14+
resetTimeoutMs: number
15+
}
16+
17+
interface ServerState {
18+
state: CircuitState
19+
failures: number
20+
nextAttemptMs: number
21+
isHalfOpenProbing: boolean
22+
}
23+
24+
const logger = createLogger('mcp:resilience:circuit-breaker')
25+
26+
export class CircuitBreakerMiddleware implements McpMiddleware {
27+
// Use a Map to maintain insertion order for standard LRU-like eviction if necessary.
28+
// We constrain it to prevent memory leaks if thousands of ephemeral servers connect.
29+
private registry = new Map<string, ServerState>()
30+
private config: CircuitBreakerConfig
31+
32+
constructor(config: Partial<CircuitBreakerConfig> = {}) {
33+
this.config = {
34+
failureThreshold: config.failureThreshold ?? 5,
35+
resetTimeoutMs: config.resetTimeoutMs ?? 30000,
36+
}
37+
}
38+
39+
private getState(serverId: string): ServerState {
40+
let state = this.registry.get(serverId)
41+
if (!state) {
42+
state = {
43+
state: 'CLOSED',
44+
failures: 0,
45+
nextAttemptMs: 0,
46+
isHalfOpenProbing: false,
47+
}
48+
this.registry.set(serverId, state)
49+
this.evictIfNecessary()
50+
}
51+
return state
52+
}
53+
54+
private evictIfNecessary() {
55+
if (this.registry.size > MAX_SERVER_STATES) {
56+
// Evict the oldest entry (first inserted)
57+
const firstKey = this.registry.keys().next().value
58+
if (firstKey) {
59+
this.registry.delete(firstKey)
60+
}
61+
}
62+
}
63+
64+
async execute(
65+
context: McpExecutionContext,
66+
next: McpMiddlewareNext
67+
): Promise<McpToolResult> {
68+
const { serverId, toolCall } = context
69+
const serverState = this.getState(serverId)
70+
71+
// 1. Check current state and evaluate timeouts
72+
if (serverState.state === 'OPEN') {
73+
if (Date.now() > serverState.nextAttemptMs) {
74+
// Time to try again, enter HALF-OPEN
75+
logger.info(`Circuit breaker entering HALF-OPEN for server ${serverId}`)
76+
serverState.state = 'HALF-OPEN'
77+
serverState.isHalfOpenProbing = false
78+
} else {
79+
// Fast-fail
80+
throw new Error(
81+
`Circuit breaker is OPEN for server ${serverId}. Fast-failing request to ${toolCall.name}.`
82+
)
83+
}
84+
}
85+
86+
if (serverState.state === 'HALF-OPEN') {
87+
if (serverState.isHalfOpenProbing) {
88+
// Another request is already probing. Fast-fail concurrent requests.
89+
throw new Error(
90+
`Circuit breaker is HALF-OPEN for server ${serverId}. A probe request is currently executing. Fast-failing concurrent request to ${toolCall.name}.`
91+
)
92+
}
93+
// We are the chosen ones. Lock it down.
94+
serverState.isHalfOpenProbing = true
95+
}
96+
97+
try {
98+
// 2. Invoke the next layer
99+
const result = await next(context)
100+
101+
// 3. Handle result parsing (isError = true counts as failure for us)
102+
if (result.isError) {
103+
this.recordFailure(serverId, serverState)
104+
} else {
105+
this.recordSuccess(serverId, serverState)
106+
}
107+
108+
return result
109+
} catch (error) {
110+
// Note: we record failure on ANY exception
111+
this.recordFailure(serverId, serverState)
112+
throw error // Re-throw to caller
113+
}
114+
}
115+
116+
private recordSuccess(serverId: string, state: ServerState) {
117+
if (state.state !== 'CLOSED') {
118+
logger.info(`Circuit breaker reset to CLOSED for server ${serverId}`)
119+
}
120+
state.state = 'CLOSED'
121+
state.failures = 0
122+
state.isHalfOpenProbing = false
123+
}
124+
125+
private recordFailure(serverId: string, state: ServerState) {
126+
if (state.state === 'HALF-OPEN') {
127+
// The probe failed! Trip immediately back to OPEN.
128+
logger.warn(
129+
`Circuit breaker probe failed. Tripping back to OPEN for server ${serverId}`
130+
)
131+
this.tripToOpen(state)
132+
} else if (state.state === 'CLOSED') {
133+
state.failures++
134+
if (state.failures >= this.config.failureThreshold) {
135+
logger.error(
136+
`Circuit breaker failure threshold reached (${state.failures}/${this.config.failureThreshold}). Tripping to OPEN for server ${serverId}`
137+
)
138+
this.tripToOpen(state)
139+
}
140+
}
141+
}
142+
143+
private tripToOpen(state: ServerState) {
144+
state.state = 'OPEN'
145+
state.isHalfOpenProbing = false
146+
state.nextAttemptMs = Date.now() + this.config.resetTimeoutMs
147+
}
148+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { ResiliencePipeline } from './pipeline'
2+
import { TelemetryMiddleware } from './telemetry'
3+
import { CircuitBreakerMiddleware } from './circuit-breaker'
4+
import { SchemaValidatorMiddleware } from './schema-validator'
5+
import type { McpExecutionContext } from './types'
6+
7+
// Setup Pipeline with a fast 1.5s reset timeout for the demo
8+
const pipeline = new ResiliencePipeline()
9+
.use(new TelemetryMiddleware())
10+
.use(new SchemaValidatorMiddleware())
11+
.use(new CircuitBreakerMiddleware({ failureThreshold: 3, resetTimeoutMs: 1500 }))
12+
13+
const mockContext: McpExecutionContext = {
14+
toolCall: { name: 'flaky_tool', arguments: {} },
15+
serverId: 'demo-server',
16+
userId: 'demo-user',
17+
workspaceId: 'demo-workspace'
18+
}
19+
20+
let attemptTracker = 0
21+
22+
// A mock downstream MCP execution handler that fails the first 4 times, then succeeds
23+
const mockExecuteTool = async () => {
24+
attemptTracker++
25+
console.log(`\n--- Request #${attemptTracker} ---`)
26+
27+
// Simulate network latency
28+
await new Promise(r => setTimeout(r, 50))
29+
30+
if (attemptTracker <= 3) {
31+
throw new Error('Connection Refused: Target server is down!')
32+
}
33+
34+
return { content: [{ type: 'text', text: 'Success! Target server is back online.' }] }
35+
}
36+
37+
async function runDemo() {
38+
console.log("🚀 Starting Resilience Pipeline Demo...\n")
39+
40+
// Attempt 1: CLOSED -> Fails
41+
try { await pipeline.execute(mockContext, mockExecuteTool) } catch (e: any) { console.error(`❌ Result: ${e.message}`) }
42+
43+
// Attempt 2: CLOSED -> Fails
44+
try { await pipeline.execute(mockContext, mockExecuteTool) } catch (e: any) { console.error(`❌ Result: ${e.message}`) }
45+
46+
// Attempt 3: CLOSED -> Fails (Hits threshold, trips to OPEN)
47+
try { await pipeline.execute(mockContext, mockExecuteTool) } catch (e: any) { console.error(`❌ Result: ${e.message}`) }
48+
49+
// Attempt 4: OPEN (Fast fails immediately without hitting downstream mockExecuteTool)
50+
try { await pipeline.execute(mockContext, mockExecuteTool) } catch (e: any) { console.error(`🛑 Fast-Fail Result: ${e.message}`) }
51+
52+
console.log("\n⏳ Waiting 2 seconds for Circuit Breaker to cool down...")
53+
await new Promise(r => setTimeout(r, 2000))
54+
55+
// Attempt 5: HALF-OPEN -> Succeeds! (Transitions back to CLOSED)
56+
try {
57+
const result = await pipeline.execute(mockContext, mockExecuteTool)
58+
console.log(`✅ Result: ${result.content?.[0].text}`)
59+
} catch (e: any) { console.error(`❌ Result: ${e.message}`) }
60+
61+
// Attempt 6: CLOSED -> Succeeds normally
62+
try {
63+
const result = await pipeline.execute(mockContext, mockExecuteTool)
64+
console.log(`✅ Result: ${result.content?.[0].text}`)
65+
} catch (e: any) { console.error(`❌ Result: ${e.message}`) }
66+
67+
console.log("\n🎉 Demo Complete!")
68+
}
69+
70+
runDemo().catch(console.error)

0 commit comments

Comments
 (0)