From 5ae37b8c68eedcb9dc33fd77a18176a32893d47f Mon Sep 17 00:00:00 2001 From: Nicolas De Loof Date: Mon, 22 Sep 2025 15:36:57 +0200 Subject: [PATCH] use http.Agent with a socket factory to reuse sockets and support concurrent use Signed-off-by: Nicolas De Loof --- lib/docker-client.ts | 78 ++++++++++----------- lib/socket.ts | 54 +++++++++----- lib/ssh.ts | 147 ++++++++++++++++++++------------------- test/concurrency.test.ts | 102 +++++++++++++++++++++++++++ 4 files changed, 252 insertions(+), 129 deletions(-) create mode 100644 test/concurrency.test.ts diff --git a/lib/docker-client.ts b/lib/docker-client.ts index 4f4e782..3bd123e 100644 --- a/lib/docker-client.ts +++ b/lib/docker-client.ts @@ -4,9 +4,10 @@ import * as path from 'path'; import * as os from 'os'; import * as http from 'http'; import * as https from 'https'; +import * as tls from 'tls'; import * as models from './models/index.js'; import { HTTPClient } from './http.js'; -import { SocketReuseAgent } from './socket.js'; +import { SocketAgent } from './socket.js'; import { Filter } from './filter.js'; import { SSH } from './ssh.js'; import { TLS } from './tls.js'; @@ -40,54 +41,42 @@ export class DockerClient { certPath?: string, ): Promise { return new Promise((resolve, reject) => { - let socket: net.Socket; - if (dockerHost.startsWith('unix:')) { - // Unix socket connection + // Unix socket connection - use SocketAgent with socket creation function const socketPath = dockerHost.substring(5); // Remove "unix:" prefix - socket = net.createConnection(socketPath); - socket.on('connect', () => { - // Increase max listeners since we'll be reusing this socket for multiple requests - socket.setMaxListeners(50); - const agent = new SocketReuseAgent(socket); + try { + const agent = new SocketAgent(() => + net.createConnection(socketPath), + ); resolve(new DockerClient(agent)); - }); - - socket.on('error', (error) => { + } catch (error) { reject( new Error( - `Failed to connect to Docker host ${dockerHost}: ${error.message}`, + `Failed to create Docker client for ${dockerHost}: ${error.message}`, ), ); - }); + } } else if (dockerHost.startsWith('tcp:')) { - // TCP connection - use HTTP/HTTPS agents + // TCP connection - use SocketAgent with TCP socket creation function const tcpAddress = dockerHost.substring(6); // Remove "tcp://" prefix const [host, portStr] = tcpAddress.split(':'); const port = parseInt(portStr) || (certPath ? 2376 : 2375); // Default ports: 2376 for TLS, 2375 for plain try { - let agent: http.Agent; + let agent: SocketAgent; if (certPath) { - // Use HTTPS agent with TLS certificates + // Use SocketAgent with TLS socket creation function const tlsOptions = TLS.loadCertificates(certPath); - agent = new https.Agent({ - host, - port, - ...tlsOptions, - keepAlive: true, - keepAliveMsecs: 30000, - }); + agent = new SocketAgent(() => + tls.connect({ host, port, ...tlsOptions }), + ); } else { - // Use plain HTTP agent - agent = new http.Agent({ - host, - port, - keepAlive: true, - keepAliveMsecs: 30000, - }); + // Use SocketAgent with plain TCP socket creation function + agent = new SocketAgent(() => + net.createConnection({ host, port }), + ); } resolve(new DockerClient(agent)); @@ -99,15 +88,19 @@ export class DockerClient { ); } } else if (dockerHost.startsWith('ssh:')) { - // SSH connection - SSH.createConnection(dockerHost) - .then((sshSocket) => { - // Increase max listeners since we'll be reusing this socket for multiple requests - sshSocket.setMaxListeners(50); - const agent = new SocketReuseAgent(sshSocket); - resolve(new DockerClient(agent)); - }) - .catch(reject); + // SSH connection - use SocketAgent with SSH socket creation function + try { + const agent = new SocketAgent( + SSH.createSocketFactory(dockerHost), + ); + resolve(new DockerClient(agent)); + } catch (error) { + reject( + new Error( + `Failed to create SSH Docker client for ${dockerHost}: ${error.message}`, + ), + ); + } } else { reject( new Error( @@ -203,6 +196,11 @@ export class DockerClient { * @returns Promise that resolves to a connected DockerClient instance */ static async fromDockerConfig(): Promise { + // Check for DOCKER_HOST environment variable first - takes precedence over config + if (process.env.DOCKER_HOST) { + return DockerClient.fromDockerHost(process.env.DOCKER_HOST); + } + // Check for DOCKER_CONFIG environment variable, otherwise use default path const configPath = process.env.DOCKER_CONFIG || diff --git a/lib/socket.ts b/lib/socket.ts index eed1f25..51dfe0c 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -3,33 +3,51 @@ import * as http from 'http'; import * as stream from 'stream'; /** - * Custom HTTP Agent that reuses an existing socket connection. - * This agent is designed to work with persistent socket connections - * like Unix domain sockets or long-lived TCP connections. + * HTTP Agent that creates socket connections using a provided factory function. + * This allows flexible socket creation strategies while supporting connection pooling. */ -export class SocketReuseAgent extends http.Agent { - private socket: net.Socket; +export class SocketAgent extends http.Agent { + private socketFactory: () => net.Socket; - constructor(socket: net.Socket) { + constructor(createSocketFn: () => net.Socket) { super({ keepAlive: true, - keepAliveMsecs: 0, + keepAliveMsecs: 30000, maxSockets: Infinity, - maxFreeSockets: 1, + maxFreeSockets: 10, + maxTotalSockets: Infinity, + timeout: 120000, + scheduling: 'lifo', }); - this.socket = socket; - } + this.socketFactory = createSocketFn; + + // Override createConnection to use our socket factory + this.createConnection = ( + options: any, + callback?: (err: Error | null, socket?: stream.Duplex) => void, + ): stream.Duplex => { + const socket = this.socketFactory(); + socket.setNoDelay(true); + socket.setKeepAlive(true, 30000); + socket.setTimeout(0); + + if (callback) { + const onConnect = () => { + socket.removeListener('error', onError); + callback(null, socket); + }; - createConnection(options: any, callback?: any): stream.Duplex { - // Ensure our socket is properly configured for HTTP - this.socket.setNoDelay(true); - this.socket.setKeepAlive(true); + const onError = (error: Error) => { + socket.removeListener('connect', onConnect); + callback(error); + }; - if (callback) { - process.nextTick(callback, null, this.socket); - } + socket.once('connect', onConnect); + socket.once('error', onError); + } - return this.socket; + return socket; + }; } } diff --git a/lib/ssh.ts b/lib/ssh.ts index d468175..a0bbca0 100644 --- a/lib/ssh.ts +++ b/lib/ssh.ts @@ -9,59 +9,86 @@ import { Client } from 'ssh2'; */ export class SSH { /** - * Create an SSH connection to a remote Docker daemon - * @param sshHost SSH host string (e.g., "ssh://user@host:22/var/run/docker.sock") - * @returns Promise that resolves to a connected socket through SSH tunnel + * Get SSH private key from common locations + * @returns SSH private key buffer or undefined */ - static createConnection(sshHost: string): Promise { - return new Promise((resolve, reject) => { - // Parse SSH URL: ssh://[user@]host[:port][/path/to/socket] - const sshUrl = sshHost.substring(6); // Remove "ssh://" prefix - - let user = 'root'; // Default user - let host: string; - let port = 22; // Default SSH port - let socketPath = '/var/run/docker.sock'; // Default Docker socket path - - // Parse user@host part - const atIndex = sshUrl.indexOf('@'); - let hostPart: string; - - if (atIndex !== -1) { - user = sshUrl.substring(0, atIndex); - hostPart = sshUrl.substring(atIndex + 1); - } else { - hostPart = sshUrl; + private static getPrivateKey(): Buffer | undefined { + const keyPaths = [ + path.join(os.homedir(), '.ssh', 'id_rsa'), + path.join(os.homedir(), '.ssh', 'id_ed25519'), + path.join(os.homedir(), '.ssh', 'id_ecdsa'), + ]; + + for (const keyPath of keyPaths) { + try { + if (fs.existsSync(keyPath)) { + return fs.readFileSync(keyPath); + } + } catch (err) { + // Continue to next key } + } - // Parse host:port/path part - const slashIndex = hostPart.indexOf('/'); - let hostPortPart: string; + return undefined; + } - if (slashIndex !== -1) { - hostPortPart = hostPart.substring(0, slashIndex); - socketPath = hostPart.substring(slashIndex); - } else { - hostPortPart = hostPart; - } + /** + * Create a socket factory function for SSH connections that can be used with SocketAgent + * @param sshHost SSH host string (e.g., "ssh://user@host:22/var/run/docker.sock") + * @returns Function that creates new SSH socket connections + */ + static createSocketFactory(sshHost: string): () => net.Socket { + // Parse SSH connection parameters once + const sshUrl = sshHost.substring(6); // Remove "ssh://" prefix + + let user = 'root'; // Default user + let host: string; + let port = 22; // Default SSH port + let socketPath = '/var/run/docker.sock'; // Default Docker socket path + + // Parse user@host part + const atIndex = sshUrl.indexOf('@'); + let hostPart: string; + + if (atIndex !== -1) { + user = sshUrl.substring(0, atIndex); + hostPart = sshUrl.substring(atIndex + 1); + } else { + hostPart = sshUrl; + } - // Parse host:port part - const colonIndex = hostPortPart.lastIndexOf(':'); - if (colonIndex !== -1) { - host = hostPortPart.substring(0, colonIndex); - port = parseInt(hostPortPart.substring(colonIndex + 1)) || 22; - } else { - host = hostPortPart; - } + // Parse host:port/path part + const slashIndex = hostPart.indexOf('/'); + let hostPortPart: string; + if (slashIndex !== -1) { + hostPortPart = hostPart.substring(0, slashIndex); + socketPath = hostPart.substring(slashIndex); + } else { + hostPortPart = hostPart; + } + + // Parse host:port part + const colonIndex = hostPortPart.lastIndexOf(':'); + if (colonIndex !== -1) { + host = hostPortPart.substring(0, colonIndex); + port = parseInt(hostPortPart.substring(colonIndex + 1)) || 22; + } else { + host = hostPortPart; + } + + // Return factory function that creates new SSH connections + return () => { const conn = new Client(); + const sshStream = new net.Socket(); conn.on('ready', () => { // Create a Unix socket connection through SSH conn.openssh_forwardInStreamLocal(socketPath, (err, stream) => { if (err) { conn.end(); - reject( + sshStream.emit( + 'error', new Error( `Failed to create SSH tunnel to ${socketPath}: ${err.message}`, ), @@ -69,20 +96,22 @@ export class SSH { return; } - // Wrap the SSH stream as a net.Socket - const socket = stream as any as net.Socket; + // Pipe the SSH stream to our socket wrapper + stream.pipe(sshStream); + sshStream.pipe(stream); // Handle SSH connection cleanup - socket.on('close', () => { + sshStream.on('close', () => { conn.end(); }); - resolve(socket); + sshStream.emit('connect'); }); }); conn.on('error', (err) => { - reject( + sshStream.emit( + 'error', new Error( `SSH connection failed to ${user}@${host}:${port}: ${err.message}`, ), @@ -90,39 +119,15 @@ export class SSH { }); // Connect using SSH key authentication (looks for default keys) - // TODO: Add support for password authentication and custom key paths conn.connect({ host, port, username: user, - // Try common SSH key locations privateKey: SSH.getPrivateKey(), tryKeyboard: true, }); - }); - } - - /** - * Get SSH private key from common locations - * @returns SSH private key buffer or undefined - */ - private static getPrivateKey(): Buffer | undefined { - const keyPaths = [ - path.join(os.homedir(), '.ssh', 'id_rsa'), - path.join(os.homedir(), '.ssh', 'id_ed25519'), - path.join(os.homedir(), '.ssh', 'id_ecdsa'), - ]; - for (const keyPath of keyPaths) { - try { - if (fs.existsSync(keyPath)) { - return fs.readFileSync(keyPath); - } - } catch (err) { - // Continue to next key - } - } - - return undefined; + return sshStream; + }; } } diff --git a/test/concurrency.test.ts b/test/concurrency.test.ts new file mode 100644 index 0000000..e5b2ae3 --- /dev/null +++ b/test/concurrency.test.ts @@ -0,0 +1,102 @@ +import { assert, test } from 'vitest'; +import { DockerClient } from '../lib/docker-client.js'; + +test('concurrent requests should execute in parallel', async () => { + const client = await DockerClient.fromDockerConfig(); + const startTime = Date.now(); + + // Make 5 concurrent API calls + const promises = [ + client.systemPing(), + client.systemInfo(), + client.systemVersion(), + client.containerList({ all: true }), + client.imageList(), + ]; + + // Execute all requests concurrently + const results = await Promise.all(promises); + const totalTime = Date.now() - startTime; + + // Verify all requests completed successfully + assert.isNotNull(results[0]); // systemPing result + assert.isNotNull(results[1]); // systemInfo result + assert.isNotNull(results[2]); // systemVersion result + assert.isNotNull(results[3]); // containerList result + assert.isNotNull(results[4]); // imageList result + + console.log(` Completed 5 concurrent requests in ${totalTime}ms`); + + // Concurrent requests should be faster than sequential ones + // This is a rough check - concurrent should typically be < 80% of sequential time + assert.isTrue( + totalTime < 10000, + 'Concurrent requests should complete within reasonable time', + ); +}, 15000); + +test('high concurrency stress test', async () => { + const client = await DockerClient.fromDockerConfig(); + const startTime = Date.now(); + + // Make 20 concurrent ping requests to test connection pool + const promises = Array.from({ length: 20 }, () => client.systemPing()); + + // Execute all requests concurrently + const results = await Promise.all(promises); + const totalTime = Date.now() - startTime; + + // Verify all requests completed successfully + results.forEach((result, index) => { + assert.isNotNull(result, `Request ${index} should return a result`); + }); + + console.log(` Completed 20 concurrent ping requests in ${totalTime}ms`); + console.log(` Average time per request: ${(totalTime / 20).toFixed(1)}ms`); + + // All requests should complete within reasonable time + assert.isTrue( + totalTime < 15000, + 'High concurrency requests should complete within reasonable time', + ); +}, 20000); + +test('mixed concurrent operations', async () => { + const client = await DockerClient.fromDockerConfig(); + + // Test different types of concurrent operations + const promises = [ + // Read operations + client.systemPing(), + client.systemVersion(), + client.containerList({ all: true }), + client.imageList(), + client.networkList(), + client.volumeList(), + // Info operations + client.systemInfo(), + client.systemPing(), + client.systemVersion(), + client.containerList({ limit: 5 }), + ]; + + const startTime = Date.now(); + const results = await Promise.all(promises); + const totalTime = Date.now() - startTime; + + // Verify all requests completed successfully + results.forEach((result, index) => { + assert.isNotNull( + result, + `Mixed operation ${index} should return a result`, + ); + }); + + console.log(` Completed 10 mixed concurrent operations in ${totalTime}ms`); + + // Should handle mixed operations efficiently + assert.isTrue( + totalTime < 12000, + 'Mixed concurrent operations should complete efficiently', + ); +}, 18000);