diff --git a/README.md b/README.md index 055401b..f1df80a 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ _tl;dr:_ You're free to use this code, make any changes you need, have fun with - [x] ContainerRename - [x] ContainerPause - [x] ContainerUnpause -- [ ] ContainerAttach +- [x] ContainerAttach - [ ] ContainerAttachWebsocket - [x] ContainerWait - [x] ContainerDelete diff --git a/lib/docker-client.ts b/lib/docker-client.ts index 3bd123e..8883ee9 100644 --- a/lib/docker-client.ts +++ b/lib/docker-client.ts @@ -11,6 +11,8 @@ import { SocketAgent } from './socket.js'; import { Filter } from './filter.js'; import { SSH } from './ssh.js'; import { TLS } from './tls.js'; +import * as stream from 'node:stream'; +import { demultiplexStream } from './multiplexed-stream.js'; export class Credentials { username: string; @@ -385,7 +387,8 @@ export class DockerClient { */ public async containerAttach( id: string, - callback: (data: Buffer) => void, + stdout: stream.Writable, + stderr: stream.Writable, options?: { detachKeys?: string; logs?: boolean; @@ -395,10 +398,22 @@ export class DockerClient { stderr?: boolean; }, ): Promise { - return this.api.post(`/containers/${id}/attach`, options, undefined, { - Connection: 'Upgrade', - Upgrade: 'tcp', - }); + return this.api + .sendHTTPRequest('POST', `/containers/${id}/attach`, { + params: options, + headers: { + Connection: 'Upgrade', + Upgrade: 'tcp', + }, + }) + .then((response) => { + const contentType = response.headers['content-type']; + if (contentType === 'application/vnd.docker.raw-stream') { + response.sock.pipe(stdout); + } else { + response.sock.pipe(demultiplexStream(stdout, stderr)); + } + }); } /** @@ -530,17 +545,27 @@ export class DockerClient { * @param timestamps Add timestamps to every log line * @param tail Only return this number of log lines from the end of the logs. Specify as an integer or 'all' to output all log lines. */ - public async containerLogs(options?: { - id: string; - follow?: boolean; - stdout?: boolean; - stderr?: boolean; - since?: number; - until?: number; - timestamps?: boolean; - tail?: string; - }): Promise { - // TODO + public async containerLogs( + id: string, + stdout: stream.Writable, + stderr: stream.Writable, + options?: { + follow?: boolean; + stdout?: boolean; + stderr?: boolean; + since?: number; + until?: number; + timestamps?: boolean; + tail?: string; + }, + ): Promise { + const demux = demultiplexStream(stdout, stderr); + return this.api.get( + `/containers/${id}/logs`, + options, + 'application/vnd.docker.raw-stream', + (data) => demux.write(data), + ); } /** diff --git a/lib/http.ts b/lib/http.ts index fcdc1a8..172151e 100644 --- a/lib/http.ts +++ b/lib/http.ts @@ -32,18 +32,17 @@ export class ConflictError extends Error { // Function to extract error message from response body function getErrorMessage( - status: string, - headers: { [key: string]: string }, + res: http.IncomingMessage, body: string | undefined, ): string { - const contentType = headers['content-type']?.toLowerCase(); + const contentType = res.headers['content-type']?.toLowerCase(); if (contentType?.includes('application/json') && body) { const jsonBody = JSON.parse(body); if (jsonBody.message) { return jsonBody.message; } } - return status; + return res.statusMessage; } // Interface to represent an HTTP response @@ -52,6 +51,7 @@ export interface HTTPResponse { statusCode: number; headers: { [key: string]: string }; body?: string; + sock?: stream.Duplex; } /** @@ -132,10 +132,12 @@ export class HTTPClient { agent: this.agent, }; - const req = http.request(requestOptions, (res) => { - let responseBody = ''; + // Helper function to create response object + const createResponse = ( + res: http.IncomingMessage, + body?: string, + ): HTTPResponse => { const responseHeaders: { [key: string]: string } = {}; - // Convert headers to lowercase keys Object.entries(res.headers).forEach(([key, value]) => { responseHeaders[key.toLowerCase()] = Array.isArray(value) @@ -143,24 +145,23 @@ export class HTTPClient { : value || ''; }); - // Helper function to create response object - const createResponse = (body?: string): HTTPResponse => ({ + return { statusCode: res.statusCode, statusMessage: res.statusMessage, headers: responseHeaders, body, - }); + }; + }; + + const req = http.request(requestOptions, (res) => { + let responseBody = ''; // Helper function to handle response completion const handleResponseEnd = (body?: string) => { - const response = createResponse(body); + const response = createResponse(res, body); if (res.statusCode >= 400) { - const errorMessage = getErrorMessage( - res.statusMessage, - responseHeaders, - body, - ); + const errorMessage = getErrorMessage(res, body); if (res.statusCode === 404) { reject(new NotFoundError(errorMessage)); } else if (res.statusCode === 401) { @@ -182,29 +183,29 @@ export class HTTPClient { ); }; + // Set up common error handler + res.on('error', handleResponseError); + // Check for Docker stream content types - const contentType = responseHeaders['content-type']; + const contentType = res.headers['content-type']; const isDockerStream = contentType === DOCKER_RAW_STREAM || contentType === DOCKER_MULTIPLEXED_STREAM; - // Set up common error handler - res.on('error', handleResponseError); - if (isDockerStream && callback) { // For upgrade protocols, forward all data directly to callback - res.on('data', (chunk: Buffer) => { - callback(chunk.toString('utf8')); + res.on('data', (data: Buffer) => { + callback(data.toString('utf8')); }); // Resolve immediately with upgrade response - resolve(createResponse()); + resolve(createResponse(res)); return; } // Handle chunked responses with callback if ( - responseHeaders['transfer-encoding'] === 'chunked' && + res.headers['transfer-encoding'] === 'chunked' && callback ) { res.on('data', (chunk: Buffer) => { @@ -226,6 +227,12 @@ export class HTTPClient { reject(new Error(`Request error: ${error.message}`)); }); + req.on('upgrade', (res, socket, head) => { + const resp = createResponse(res); + resp.sock = socket; + resolve(resp); + }); + // Write request body if (body) { if (typeof body === 'string') { diff --git a/lib/multiplexed-stream.ts b/lib/multiplexed-stream.ts index f5001f2..75252f5 100644 --- a/lib/multiplexed-stream.ts +++ b/lib/multiplexed-stream.ts @@ -1,40 +1,61 @@ -export function createMultiplexedStreamCallback( - stdout: NodeJS.WritableStream, - stderr: NodeJS.WritableStream, -): (chunk: string) => void { +import * as stream from 'node:stream'; + +export function demultiplexStream( + stdout: stream.Writable, + stderr: stream.Writable, +): stream.Writable { let buffer = Buffer.alloc(0); - return (data: string) => { - // Append new chunk data to buffer - buffer = Buffer.concat([buffer, Buffer.from(data, 'utf8')]); + return new stream.Writable({ + write( + chunk: any, + encoding: BufferEncoding, + callback: (error?: Error | null) => void, + ) { + try { + // Convert chunk to Buffer if it's not already + const data = Buffer.isBuffer(chunk) + ? chunk + : Buffer.from(chunk, encoding); + + // Append new chunk data to buffer + buffer = Buffer.concat([buffer, data]); + + // Process complete messages from buffer + while (buffer.length >= 8) { + // Read first byte for stream destination + const streamType = buffer[0]; - // Process complete messages from buffer - while (buffer.length >= 8) { - // Read first byte for stream destination - const streamType = buffer[0]; + // Read last 4 bytes as content size (big endian uint32) + const contentSize = buffer.readUInt32BE(4); - // Read last 4 bytes as content size (big endian uint32) - const contentSize = buffer.readUInt32BE(4); + // Check if we have enough data for the complete message + if (buffer.length >= 8 + contentSize) { + // Extract content + const content = buffer.subarray(8, 8 + contentSize); - // Check if we have enough data for the complete message - if (buffer.length >= 8 + contentSize) { - // Extract content - const content = buffer.subarray(8, 8 + contentSize); + // Send to appropriate stream + if (streamType === 1) { + stdout.write(content); + } else if (streamType === 2) { + stderr.write(content); + } + // Ignore other stream types - // Send to appropriate stream - if (streamType === 1) { - stdout.write(content); - } else if (streamType === 2) { - stderr.write(content); + // Remove processed message from buffer + buffer = buffer.subarray(8 + contentSize); + } else { + // Not enough data for complete message, wait for more + break; + } } - // Ignore other stream types - // Remove processed message from buffer - buffer = buffer.subarray(8 + contentSize); - } else { - // Not enough data for complete message, wait for more - break; + callback(); + } catch (error) { + callback( + error instanceof Error ? error : new Error(String(error)), + ); } - } - }; + }, + }); } diff --git a/test/container.test.ts b/test/container.test.ts new file mode 100644 index 0000000..1410738 --- /dev/null +++ b/test/container.test.ts @@ -0,0 +1,268 @@ +import { assert, test } from 'vitest'; +import { DockerClient } from '../lib/docker-client.js'; +import { Writable } from 'stream'; + +// Test Docker Container API functionality + +test('should receive container stdout on attach', async () => { + const client = await DockerClient.fromDockerConfig(); + let containerId: string | undefined; + + try { + // Pull alpine image first + console.log(' Pulling alpine image...'); + await client.imageCreate( + (event) => { + if (event.status) console.log(` ${event.status}`); + }, + { + fromImage: 'docker.io/library/alpine', + tag: 'latest', + }, + ); + + // Create container with echo command + console.log(' Creating Alpine container with echo command...'); + const createResponse = await client.containerCreate({ + Image: 'docker.io/library/alpine:latest', + Cmd: ['echo', 'hello'], + Labels: { + 'test.type': 'container-test', + }, + }); + + containerId = createResponse.Id; + assert.isNotNull(containerId); + console.log(` Container created: ${containerId.substring(0, 12)}`); + + // Set up streams to capture output + const stdoutData: string[] = []; + const stderrData: string[] = []; + + const stdout = new Writable({ + write( + chunk: any, + encoding: BufferEncoding, + callback: (error?: Error | null) => void, + ) { + const data = chunk.toString(); + stdoutData.push(data); + console.log(` STDOUT: ${JSON.stringify(data)}`); + callback(); + }, + }); + + const stderr = new Writable({ + write( + chunk: any, + encoding: BufferEncoding, + callback: (error?: Error | null) => void, + ) { + const data = chunk.toString(); + stderrData.push(data); + console.log(` STDERR: ${JSON.stringify(data)}`); + callback(); + }, + }); + + // Attach to container before starting + console.log(' Attaching to container...'); + const attachPromise = client.containerAttach( + containerId, + stdout, + stderr, + { + stream: true, + stdout: true, + stderr: true, + }, + ); + + // Start the container + console.log(' Starting container...'); + await client.containerStart(containerId); + console.log(' Container started'); + + // Wait for the attach operation to complete + await attachPromise; + console.log(' Attach completed'); + + // Wait for container to finish + console.log(' Waiting for container to finish...'); + const waitResult = await client.containerWait(containerId); + console.log( + ` Container finished with exit code: ${waitResult.StatusCode}`, + ); + + // Verify the output + console.log(' Verifying output...'); + console.log(` Captured stdout data: ${JSON.stringify(stdoutData)}`); + console.log(` Captured stderr data: ${JSON.stringify(stderrData)}`); + + // Check that we received "hello" in stdout + const allStdout = stdoutData.join(''); + assert.include(allStdout, 'hello', 'Should receive "hello" in stdout'); + + // Verify container exited successfully + assert.equal( + waitResult.StatusCode, + 0, + 'Container should exit with code 0', + ); + + console.log(' ✓ Test passed: received expected output'); + } finally { + // Clean up: delete container + if (containerId) { + console.log(' Cleaning up container...'); + try { + await client.containerDelete(containerId, { force: true }); + console.log(' Container deleted successfully'); + } catch (deleteError) { + console.log( + ` Warning: Failed to delete container: ${(deleteError as any)?.message}`, + ); + } + } + } +}, 30000); // 30 second timeout + +test('should collect container output using containerLogs', async () => { + const client = await DockerClient.fromDockerConfig(); + let containerId: string | undefined; + + try { + // Pull alpine image first (should be cached from previous test) + console.log(' Pulling alpine image...'); + await client.imageCreate( + (event) => { + if (event.status) console.log(` ${event.status}`); + }, + { + fromImage: 'docker.io/library/alpine', + tag: 'latest', + }, + ); + + // Create container with a command that produces multiple lines of output + console.log(' Creating Alpine container with multi-line output...'); + const createResponse = await client.containerCreate({ + Image: 'docker.io/library/alpine:latest', + Cmd: ['sh', '-c', 'echo "line1"; echo "line2"; echo "line3"'], + Labels: { + 'test.type': 'container-logs-test', + }, + }); + + containerId = createResponse.Id; + assert.isNotNull(containerId); + console.log(` Container created: ${containerId.substring(0, 12)}`); + + // Start the container and let it finish + console.log(' Starting container...'); + await client.containerStart(containerId); + console.log(' Container started'); + + // Wait for container to finish + console.log(' Waiting for container to finish...'); + const waitResult = await client.containerWait(containerId); + console.log( + ` Container finished with exit code: ${waitResult.StatusCode}`, + ); + + // Set up streams to capture logs + const stdoutLogsData: string[] = []; + const stderrLogsData: string[] = []; + + const stdoutLogsStream = new Writable({ + write( + chunk: any, + encoding: BufferEncoding, + callback: (error?: Error | null) => void, + ) { + const data = chunk.toString(); + stdoutLogsData.push(data); + console.log(` STDOUT LOGS: ${JSON.stringify(data)}`); + callback(); + }, + }); + + const stderrLogsStream = new Writable({ + write( + chunk: any, + encoding: BufferEncoding, + callback: (error?: Error | null) => void, + ) { + const data = chunk.toString(); + stderrLogsData.push(data); + console.log(` STDERR LOGS: ${JSON.stringify(data)}`); + callback(); + }, + }); + + // Get container logs + console.log(' Fetching container logs...'); + await client.containerLogs( + containerId, + stdoutLogsStream, + stderrLogsStream, + { + stdout: true, + stderr: true, + }, + ); + console.log(' Logs retrieved'); + + // Give a moment for the streams to finish processing + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Verify the output + console.log(' Verifying logs...'); + console.log( + ` Captured stdout logs: ${JSON.stringify(stdoutLogsData)}`, + ); + console.log( + ` Captured stderr logs: ${JSON.stringify(stderrLogsData)}`, + ); + + // Check that we received all expected lines in stdout + const allStdoutLogs = stdoutLogsData.join(''); + assert.include( + allStdoutLogs, + 'line1', + 'Should receive "line1" in stdout logs', + ); + assert.include( + allStdoutLogs, + 'line2', + 'Should receive "line2" in stdout logs', + ); + assert.include( + allStdoutLogs, + 'line3', + 'Should receive "line3" in stdout logs', + ); + + // Verify container exited successfully + assert.equal( + waitResult.StatusCode, + 0, + 'Container should exit with code 0', + ); + + console.log(' ✓ Test passed: received all expected log lines'); + } finally { + // Clean up: delete container + if (containerId) { + console.log(' Cleaning up container...'); + try { + await client.containerDelete(containerId, { force: true }); + console.log(' Container deleted successfully'); + } catch (deleteError) { + console.log( + ` Warning: Failed to delete container: ${(deleteError as any)?.message}`, + ); + } + } + } +}, 30000); // 30 second timeout diff --git a/test/e2e.test.ts b/test/e2e.test.ts index a24eab2..45dc5ff 100644 --- a/test/e2e.test.ts +++ b/test/e2e.test.ts @@ -2,30 +2,6 @@ import { assert, test } from 'vitest'; import { DockerClient } from '../lib/docker-client.js'; import { Filter } from '../lib/filter.js'; -// Test Docker API connectivity -test('systemPing should return API version', async () => { - const client = await DockerClient.fromDockerConfig(); - const apiVersion = await client.systemPing(); - assert.isNotNull(apiVersion); - console.log(` Docker API version: ${apiVersion}`); -}); - -test('systemInfo should return system information', async () => { - const client = await DockerClient.fromDockerConfig(); - const info = await client.systemInfo(); - assert.isNotNull(info); - assert.isNotNull(info.ID); - console.log(` Docker system ID: ${info.ID}`); -}); - -test('systemVersion should return version information', async () => { - const client = await DockerClient.fromDockerConfig(); - const version = await client.systemVersion(); - assert.isNotNull(version); - assert.isNotNull(version.Version); - console.log(` Docker version: ${version.Version}`); -}); - test('containerList should return list of containers', async () => { const client = await DockerClient.fromDockerConfig(); const containers = await client.containerList({ all: true }); @@ -47,13 +23,6 @@ test('networkList should return list of networks', async () => { console.log(` Found ${networks.length} networks`); }); -test('volumeList should return list of volumes', async () => { - const client = await DockerClient.fromDockerConfig(); - const volumes = await client.volumeList(); - assert.isNotNull(volumes); - console.log(` Found ${volumes.Volumes?.length || 0} volumes`); -}); - test('container lifecycle should work end-to-end', async () => { const client = await DockerClient.fromDockerConfig(); let containerId: string | undefined; diff --git a/test/multiplexed-stream.test.ts b/test/multiplexed-stream.test.ts index 1b59bfa..cb4e724 100644 --- a/test/multiplexed-stream.test.ts +++ b/test/multiplexed-stream.test.ts @@ -1,5 +1,5 @@ import { assert, test } from 'vitest'; -import { createMultiplexedStreamCallback } from '../lib/multiplexed-stream.js'; +import { demultiplexStream } from '../lib/multiplexed-stream.js'; import { Writable } from 'stream'; function createMockStream(): { stream: Writable; data: Buffer[] } { @@ -29,10 +29,10 @@ test('should write stdout message to stdout stream', () => { const { stream: stdout, data: stdoutData } = createMockStream(); const { stream: stderr, data: stderrData } = createMockStream(); - const callback = createMultiplexedStreamCallback(stdout, stderr); + const demuxStream = demultiplexStream(stdout, stderr); const message = createMultiplexedMessage(1, 'Hello stdout'); - callback(message.toString('utf8')); + demuxStream.write(message); assert.deepEqual(stdoutData.length, 1); assert.deepEqual(stdoutData[0]?.toString(), 'Hello stdout'); @@ -43,10 +43,10 @@ test('should write stderr message to stderr stream', () => { const { stream: stdout, data: stdoutData } = createMockStream(); const { stream: stderr, data: stderrData } = createMockStream(); - const callback = createMultiplexedStreamCallback(stdout, stderr); + const demuxStream = demultiplexStream(stdout, stderr); const message = createMultiplexedMessage(2, 'Hello stderr'); - callback(message.toString('utf8')); + demuxStream.write(message); assert.deepEqual(stderrData.length, 1); assert.deepEqual(stderrData[0]?.toString(), 'Hello stderr'); @@ -57,10 +57,10 @@ test('should ignore unknown stream types', () => { const { stream: stdout, data: stdoutData } = createMockStream(); const { stream: stderr, data: stderrData } = createMockStream(); - const callback = createMultiplexedStreamCallback(stdout, stderr); + const demuxStream = demultiplexStream(stdout, stderr); const message = createMultiplexedMessage(3, 'Unknown stream'); - callback(message.toString('utf8')); + demuxStream.write(message); assert.deepEqual(stdoutData.length, 0); assert.deepEqual(stderrData.length, 0); @@ -70,12 +70,12 @@ test('should handle multiple messages in single chunk', () => { const { stream: stdout, data: stdoutData } = createMockStream(); const { stream: stderr, data: stderrData } = createMockStream(); - const callback = createMultiplexedStreamCallback(stdout, stderr); + const demuxStream = demultiplexStream(stdout, stderr); const message1 = createMultiplexedMessage(1, 'First stdout'); const message2 = createMultiplexedMessage(2, 'First stderr'); const combined = Buffer.concat([message1, message2]); - callback(combined.toString('utf8')); + demuxStream.write(combined); assert.deepEqual(stdoutData.length, 1); assert.deepEqual(stdoutData[0]?.toString(), 'First stdout'); @@ -87,17 +87,17 @@ test('should handle incomplete messages across multiple chunks', () => { const { stream: stdout, data: stdoutData } = createMockStream(); const { stream: stderr, data: _ } = createMockStream(); - const callback = createMultiplexedStreamCallback(stdout, stderr); + const demuxStream = demultiplexStream(stdout, stderr); const message = createMultiplexedMessage(1, 'Split message'); // Send first half const firstHalf = message.subarray(0, 10); - callback(firstHalf.toString('utf8')); + demuxStream.write(firstHalf); assert.deepEqual(stdoutData.length, 0); // Should not write yet // Send second half const secondHalf = message.subarray(10); - callback(secondHalf.toString('utf8')); + demuxStream.write(secondHalf); assert.deepEqual(stdoutData.length, 1); assert.deepEqual(stdoutData[0]?.toString(), 'Split message'); }); @@ -106,10 +106,10 @@ test('should handle empty content', () => { const { stream: stdout, data: stdoutData } = createMockStream(); const { stream: stderr, data: _ } = createMockStream(); - const callback = createMultiplexedStreamCallback(stdout, stderr); + const demuxStream = demultiplexStream(stdout, stderr); const message = createMultiplexedMessage(1, ''); - callback(message.toString('utf8')); + demuxStream.write(message); assert.deepEqual(stdoutData.length, 1); assert.deepEqual(stdoutData[0]?.toString(), ''); @@ -119,10 +119,10 @@ test('should handle very short incomplete chunks', () => { const { stream: stdout, data: stdoutData } = createMockStream(); const { stream: stderr, data: stderrData } = createMockStream(); - const callback = createMultiplexedStreamCallback(stdout, stderr); + const demuxStream = demultiplexStream(stdout, stderr); // Send only 4 bytes (less than minimum header size of 8) - callback('test'); + demuxStream.write('test'); assert.deepEqual(stdoutData.length, 0); assert.deepEqual(stderrData.length, 0); }); @@ -131,11 +131,11 @@ test('should handle large content', () => { const { stream: stdout, data: stdoutData } = createMockStream(); const { stream: stderr, data: _ } = createMockStream(); - const callback = createMultiplexedStreamCallback(stdout, stderr); + const demuxStream = demultiplexStream(stdout, stderr); const largeContent = 'x'.repeat(10000); const message = createMultiplexedMessage(1, largeContent); - callback(message.toString('utf8')); + demuxStream.write(message); assert.deepEqual(stdoutData.length, 1); assert.deepEqual(stdoutData[0]?.toString(), largeContent); diff --git a/test/system.test.ts b/test/system.test.ts new file mode 100644 index 0000000..a9d7e82 --- /dev/null +++ b/test/system.test.ts @@ -0,0 +1,27 @@ +import { assert, test } from 'vitest'; +import { DockerClient } from '../lib/docker-client.js'; + +// Test Docker System API connectivity and information + +test('systemPing should return API version', async () => { + const client = await DockerClient.fromDockerConfig(); + const apiVersion = await client.systemPing(); + assert.isNotNull(apiVersion); + console.log(` Docker API version: ${apiVersion}`); +}); + +test('systemInfo should return system information', async () => { + const client = await DockerClient.fromDockerConfig(); + const info = await client.systemInfo(); + assert.isNotNull(info); + assert.isNotNull(info.ID); + console.log(` Docker system ID: ${info.ID}`); +}); + +test('systemVersion should return version information', async () => { + const client = await DockerClient.fromDockerConfig(); + const version = await client.systemVersion(); + assert.isNotNull(version); + assert.isNotNull(version.Version); + console.log(` Docker version: ${version.Version}`); +}); diff --git a/test/volume.test.ts b/test/volume.test.ts new file mode 100644 index 0000000..43097b2 --- /dev/null +++ b/test/volume.test.ts @@ -0,0 +1,11 @@ +import { assert, test } from 'vitest'; +import { DockerClient } from '../lib/docker-client.js'; + +// Test Docker Volume API functionality + +test('volumeList should return list of volumes', async () => { + const client = await DockerClient.fromDockerConfig(); + const volumes = await client.volumeList(); + assert.isNotNull(volumes); + console.log(` Found ${volumes.Volumes?.length || 0} volumes`); +});