Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 88 additions & 2 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6750,6 +6750,89 @@ async function startStreamableHTTPServer(): Promise<void> {
// Rate limiting per session
const sessionRequestCounts: Record<string, { count: number; resetAt: number }> = {};

// Request queue per session to handle concurrent requests
/**
* Represents a queued HTTP request waiting to be processed
* Used to serialize concurrent requests on the same session to prevent race conditions
*/
type QueuedRequest = {
/** The Express request object */
req: Request;
/** The Express response object */
res: Response;
/** The parsed request body */
body: unknown;
/** Callback to resolve the promise when request completes successfully */
resolve: () => void;
/** Callback to reject the promise when request fails */
reject: (error: Error) => void;
};
const sessionRequestQueues: Record<string, QueuedRequest[]> = {};
const sessionProcessingFlags: Record<string, boolean> = {};

/**
* Process queued requests for a session sequentially
*/
const processRequestQueue = async (sessionId: string) => {
// If already processing, skip
if (sessionProcessingFlags[sessionId]) {
return;
}

sessionProcessingFlags[sessionId] = true;

try {
while (sessionRequestQueues[sessionId]?.length > 0) {
const queued = sessionRequestQueues[sessionId].shift()!;

try {
await executeTransportRequest(sessionId, queued.req, queued.res, queued.body);
queued.resolve();
} catch (error) {
queued.reject(error instanceof Error ? error : new Error(String(error)));
}
}
} finally {
sessionProcessingFlags[sessionId] = false;

// Clean up empty queue
if (sessionRequestQueues[sessionId]?.length === 0) {
delete sessionRequestQueues[sessionId];
}
}
};

/**
* Queue a request for processing
* Returns a promise that resolves when the request completes
*/
const queueRequest = async (sessionId: string, req: Request, res: Response, body: unknown): Promise<void> => {
return new Promise((resolve, reject) => {
if (!sessionRequestQueues[sessionId]) {
sessionRequestQueues[sessionId] = [];
}

sessionRequestQueues[sessionId].push({ req, res, body, resolve, reject });

// Start processing the queue (don't await - runs in background)
processRequestQueue(sessionId).catch(error => {
// Log but don't propagate - individual requests handle their own errors
logger.error(`Fatal error in queue processor for session ${sessionId}:`, error);
});
});
};

/**
* Execute the actual transport request
*/
const executeTransportRequest = async (sessionId: string, req: Request, res: Response, body: unknown) => {
const transport = streamableTransports[sessionId];
if (!transport) {
throw new Error(`Transport not found for session ${sessionId}`);
}
await transport.handleRequest(req, res, body);
};

/**
* Validate token format and length
*/
Expand Down Expand Up @@ -6939,9 +7022,9 @@ async function startStreamableHTTPServer(): Promise<void> {

if (sessionId && streamableTransports[sessionId]) {
// Reuse existing transport for ongoing session
// Queue the request to prevent concurrent access to the same transport
transport = streamableTransports[sessionId];

await transport.handleRequest(req, res, req.body);
await queueRequest(sessionId, req, res, req.body);
} else {
// Create new transport for new session
transport = new StreamableHTTPServerTransport({
Expand Down Expand Up @@ -6976,6 +7059,9 @@ async function startStreamableHTTPServer(): Promise<void> {
delete sessionRequestCounts[sid];
logger.info(`Session ${sid}: cleaned up auth mapping`);
}
// Clean up any remaining queued requests for this session
delete sessionRequestQueues[sid];
delete sessionProcessingFlags[sid];
}
};

Expand Down
92 changes: 92 additions & 0 deletions test-results-oauth.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
[
{
"name": "OAuth class instantiation",
"status": "passed",
"duration": 0
},
{
"name": "Token storage path configuration",
"status": "passed",
"duration": 0
},
{
"name": "Scope configuration with api only",
"status": "passed",
"duration": 0
},
{
"name": "Multiple scopes configuration (redundant)",
"status": "passed",
"duration": 0
},
{
"name": "hasValidToken returns false without token",
"status": "passed",
"duration": 1
},
{
"name": "hasValidToken returns true with valid token",
"status": "passed",
"duration": 0
},
{
"name": "hasValidToken returns false with expired token",
"status": "passed",
"duration": 0
},
{
"name": "clearToken removes token file",
"status": "passed",
"duration": 0
},
{
"name": "Token file has correct permissions",
"status": "passed",
"duration": 1
},
{
"name": "Port availability check",
"status": "passed",
"duration": 2
},
{
"name": "OAuth redirect URI parsing",
"status": "passed",
"duration": 0
},
{
"name": "Token expiration calculation",
"status": "passed",
"duration": 0
},
{
"name": "Shared server concept",
"status": "passed",
"duration": 2
},
{
"name": "Environment variable configuration",
"status": "passed",
"duration": 0
},
{
"name": "Token data structure validation",
"status": "passed",
"duration": 0
},
{
"name": "Invalid token storage path handling",
"status": "passed",
"duration": 0
},
{
"name": "Self-hosted GitLab URL configuration",
"status": "passed",
"duration": 0
},
{
"name": "Custom port in redirect URI",
"status": "passed",
"duration": 1
}
]
179 changes: 179 additions & 0 deletions test/concurrent-requests-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/**
* Test concurrent requests on the same session
* This tests the fix for the issue where concurrent tool calls on the same session hang indefinitely
*/

import assert from 'node:assert';
import { describe, test, before, after } from 'node:test';
import { CustomHeaderClient } from './clients/custom-header-client.js';
import { TransportMode, type ServerInstance, launchServer } from './utils/server-launcher.js';
import { MockGitLabServer, findMockServerPort } from './utils/mock-gitlab-server.js';

describe('Concurrent Requests on Same Session', () => {
let server: ServerInstance;
let mockGitLab: MockGitLabServer;
const MOCK_TOKEN = 'glpat-test-token-concurrent';
let mcpUrl: string;
let mockGitLabUrl: string;

before(async () => {
// Start mock GitLab server
const mockPort = await findMockServerPort(9300);
mockGitLab = new MockGitLabServer({
port: mockPort,
validTokens: [MOCK_TOKEN]
});
await mockGitLab.start();
mockGitLabUrl = mockGitLab.getUrl();
console.log(`Mock GitLab server started at ${mockGitLabUrl}`);

// Find available port for MCP server
const mcpPort = await findMockServerPort(3400);

// Start MCP server with Streamable HTTP transport
server = await launchServer({
mode: TransportMode.STREAMABLE_HTTP,
port: mcpPort,
env: {
LOG_LEVEL: 'info',
STREAMABLE_HTTP: 'true',
REMOTE_AUTHORIZATION: 'true',
ENABLE_DYNAMIC_API_URL: 'true',
}
});

mcpUrl = `http://127.0.0.1:${server.port}/mcp`;
console.log(`MCP server started at ${mcpUrl}`);
});

after(async () => {
server?.kill();
await mockGitLab?.stop();
});

test('should handle concurrent tool calls on the same session', async () => {
console.log('\n=== Testing Concurrent Tool Calls on Same Session ===');

// Create a single client instance
const client = new CustomHeaderClient({
'authorization': `Bearer ${MOCK_TOKEN}`,
'x-gitlab-api-url': mockGitLabUrl
});

// Connect to server
await client.connect(mcpUrl);
console.log(' βœ“ Client connected');

// Get the session ID
const sessionId = client.getSessionId();
assert.ok(sessionId, 'Session ID should exist');
console.log(` ℹ️ Session ID: ${sessionId}`);

// Test 1: Sequential calls (baseline - should work)
console.log('\n πŸ“ Test 1: Sequential calls (baseline)');
const start1 = Date.now();

await client.callTool('list_merge_requests', {
project_id: '1',
state: 'opened',
});
console.log(' βœ… Call 1 completed');

await client.callTool('get_project', {
project_id: '1',
});
console.log(' βœ… Call 2 completed');

const duration1 = Date.now() - start1;
console.log(` ⏱️ Sequential duration: ${duration1}ms`);

// Test 2: Concurrent calls (the bug scenario)
console.log('\n πŸ“ Test 2: Concurrent calls (this previously hung)');
const start2 = Date.now();

// Make two concurrent calls with the same session
const [result1, result2] = await Promise.all([
client.callTool('list_merge_requests', {
project_id: '1',
state: 'opened',
}),
client.callTool('get_project', {
project_id: '1',
}),
]);

const duration2 = Date.now() - start2;
console.log(' βœ… Call 1 completed');
console.log(' βœ… Call 2 completed');
console.log(` ⏱️ Concurrent duration: ${duration2}ms`);

// Verify results are valid
assert.ok(result1, 'Result 1 should exist');
assert.ok(result2, 'Result 2 should exist');

// Test 3: Multiple concurrent calls (stress test)
console.log('\n πŸ“ Test 3: Multiple concurrent calls (stress test)');
const start3 = Date.now();

const promises = [];
for (let i = 0; i < 5; i++) {
promises.push(
client.callTool('get_project', {
project_id: '1',
})
);
}

const results = await Promise.all(promises);
const duration3 = Date.now() - start3;

console.log(` βœ… All ${results.length} calls completed`);
console.log(` ⏱️ Duration: ${duration3}ms`);

// Verify all results are valid
for (const result of results) {
assert.ok(result, 'Result should exist');
}

// Disconnect
await client.disconnect();
console.log('\n βœ“ Client disconnected');
console.log('\n βœ… All concurrent request tests passed!');
});

test('should handle interleaved concurrent and sequential calls', async () => {
console.log('\n=== Testing Interleaved Concurrent and Sequential Calls ===');

const client = new CustomHeaderClient({
'authorization': `Bearer ${MOCK_TOKEN}`,
'x-gitlab-api-url': mockGitLabUrl
});

await client.connect(mcpUrl);

// Make concurrent calls
const [r1, r2] = await Promise.all([
client.callTool('get_project', { project_id: '1' }),
client.callTool('get_project', { project_id: '1' }),
]);
console.log(' βœ… First concurrent batch completed');

// Make a sequential call
await client.callTool('get_project', { project_id: '1' });
console.log(' βœ… Sequential call completed');

// Make more concurrent calls
const [r3, r4, r5] = await Promise.all([
client.callTool('get_project', { project_id: '1' }),
client.callTool('get_project', { project_id: '1' }),
client.callTool('get_project', { project_id: '1' }),
]);
console.log(' βœ… Second concurrent batch completed');

// Verify all results
assert.ok(r1 && r2 && r3 && r4 && r5, 'All results should exist');

await client.disconnect();
console.log(' βœ… Interleaved test passed!');
});
});