Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ _tl;dr:_ You're free to use this code, make any changes you need, have fun with
- [x] ContainerRename
- [x] ContainerPause
- [x] ContainerUnpause
- [ ] ContainerAttach
- [x] ContainerAttach
- [ ] ContainerAttachWebsocket
- [x] ContainerWait
- [x] ContainerDelete
Expand Down
57 changes: 41 additions & 16 deletions lib/docker-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import { SocketAgent } from './socket.js';
import { Filter } from './filter.js';
import { SSH } from './ssh.js';
import { TLS } from './tls.js';
import * as stream from 'node:stream';
import { demultiplexStream } from './multiplexed-stream.js';

export class Credentials {
username: string;
Expand Down Expand Up @@ -385,7 +387,8 @@ export class DockerClient {
*/
public async containerAttach(
id: string,
callback: (data: Buffer) => void,
stdout: stream.Writable,
stderr: stream.Writable,
options?: {
detachKeys?: string;
logs?: boolean;
Expand All @@ -395,10 +398,22 @@ export class DockerClient {
stderr?: boolean;
},
): Promise<void> {
return this.api.post(`/containers/${id}/attach`, options, undefined, {
Connection: 'Upgrade',
Upgrade: 'tcp',
});
return this.api
.sendHTTPRequest('POST', `/containers/${id}/attach`, {
params: options,
headers: {
Connection: 'Upgrade',
Upgrade: 'tcp',
},
})
.then((response) => {
const contentType = response.headers['content-type'];
if (contentType === 'application/vnd.docker.raw-stream') {
response.sock.pipe(stdout);
} else {
response.sock.pipe(demultiplexStream(stdout, stderr));
}
});
}

/**
Expand Down Expand Up @@ -530,17 +545,27 @@ export class DockerClient {
* @param timestamps Add timestamps to every log line
* @param tail Only return this number of log lines from the end of the logs. Specify as an integer or 'all' to output all log lines.
*/
public async containerLogs(options?: {
id: string;
follow?: boolean;
stdout?: boolean;
stderr?: boolean;
since?: number;
until?: number;
timestamps?: boolean;
tail?: string;
}): Promise<void> {
// TODO
public async containerLogs(
id: string,
stdout: stream.Writable,
stderr: stream.Writable,
options?: {
follow?: boolean;
stdout?: boolean;
stderr?: boolean;
since?: number;
until?: number;
timestamps?: boolean;
tail?: string;
},
): Promise<void> {
const demux = demultiplexStream(stdout, stderr);
return this.api.get(
`/containers/${id}/logs`,
options,
'application/vnd.docker.raw-stream',
(data) => demux.write(data),
);
}

/**
Expand Down
55 changes: 31 additions & 24 deletions lib/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,17 @@ export class ConflictError extends Error {

// Function to extract error message from response body
function getErrorMessage(
status: string,
headers: { [key: string]: string },
res: http.IncomingMessage,
body: string | undefined,
): string {
const contentType = headers['content-type']?.toLowerCase();
const contentType = res.headers['content-type']?.toLowerCase();
if (contentType?.includes('application/json') && body) {
const jsonBody = JSON.parse(body);
if (jsonBody.message) {
return jsonBody.message;
}
}
return status;
return res.statusMessage;
}

// Interface to represent an HTTP response
Expand All @@ -52,6 +51,7 @@ export interface HTTPResponse {
statusCode: number;
headers: { [key: string]: string };
body?: string;
sock?: stream.Duplex;
}

/**
Expand Down Expand Up @@ -132,35 +132,36 @@ export class HTTPClient {
agent: this.agent,
};

const req = http.request(requestOptions, (res) => {
let responseBody = '';
// Helper function to create response object
const createResponse = (
res: http.IncomingMessage,
body?: string,
): HTTPResponse => {
const responseHeaders: { [key: string]: string } = {};

// Convert headers to lowercase keys
Object.entries(res.headers).forEach(([key, value]) => {
responseHeaders[key.toLowerCase()] = Array.isArray(value)
? value.join(', ')
: value || '';
});

// Helper function to create response object
const createResponse = (body?: string): HTTPResponse => ({
return {
statusCode: res.statusCode,
statusMessage: res.statusMessage,
headers: responseHeaders,
body,
});
};
};

const req = http.request(requestOptions, (res) => {
let responseBody = '';

// Helper function to handle response completion
const handleResponseEnd = (body?: string) => {
const response = createResponse(body);
const response = createResponse(res, body);

if (res.statusCode >= 400) {
const errorMessage = getErrorMessage(
res.statusMessage,
responseHeaders,
body,
);
const errorMessage = getErrorMessage(res, body);
if (res.statusCode === 404) {
reject(new NotFoundError(errorMessage));
} else if (res.statusCode === 401) {
Expand All @@ -182,29 +183,29 @@ export class HTTPClient {
);
};

// Set up common error handler
res.on('error', handleResponseError);

// Check for Docker stream content types
const contentType = responseHeaders['content-type'];
const contentType = res.headers['content-type'];
const isDockerStream =
contentType === DOCKER_RAW_STREAM ||
contentType === DOCKER_MULTIPLEXED_STREAM;

// Set up common error handler
res.on('error', handleResponseError);

if (isDockerStream && callback) {
// For upgrade protocols, forward all data directly to callback
res.on('data', (chunk: Buffer) => {
callback(chunk.toString('utf8'));
res.on('data', (data: Buffer) => {
callback(data.toString('utf8'));
});

// Resolve immediately with upgrade response
resolve(createResponse());
resolve(createResponse(res));
return;
}

// Handle chunked responses with callback
if (
responseHeaders['transfer-encoding'] === 'chunked' &&
res.headers['transfer-encoding'] === 'chunked' &&
callback
) {
res.on('data', (chunk: Buffer) => {
Expand All @@ -226,6 +227,12 @@ export class HTTPClient {
reject(new Error(`Request error: ${error.message}`));
});

req.on('upgrade', (res, socket, head) => {
const resp = createResponse(res);
resp.sock = socket;
resolve(resp);
});

// Write request body
if (body) {
if (typeof body === 'string') {
Expand Down
81 changes: 51 additions & 30 deletions lib/multiplexed-stream.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,61 @@
export function createMultiplexedStreamCallback(
stdout: NodeJS.WritableStream,
stderr: NodeJS.WritableStream,
): (chunk: string) => void {
import * as stream from 'node:stream';

export function demultiplexStream(
stdout: stream.Writable,
stderr: stream.Writable,
): stream.Writable {
let buffer = Buffer.alloc(0);

return (data: string) => {
// Append new chunk data to buffer
buffer = Buffer.concat([buffer, Buffer.from(data, 'utf8')]);
return new stream.Writable({
write(
chunk: any,
encoding: BufferEncoding,
callback: (error?: Error | null) => void,
) {
try {
// Convert chunk to Buffer if it's not already
const data = Buffer.isBuffer(chunk)
? chunk
: Buffer.from(chunk, encoding);

// Append new chunk data to buffer
buffer = Buffer.concat([buffer, data]);

// Process complete messages from buffer
while (buffer.length >= 8) {
// Read first byte for stream destination
const streamType = buffer[0];

// Process complete messages from buffer
while (buffer.length >= 8) {
// Read first byte for stream destination
const streamType = buffer[0];
// Read last 4 bytes as content size (big endian uint32)
const contentSize = buffer.readUInt32BE(4);

// Read last 4 bytes as content size (big endian uint32)
const contentSize = buffer.readUInt32BE(4);
// Check if we have enough data for the complete message
if (buffer.length >= 8 + contentSize) {
// Extract content
const content = buffer.subarray(8, 8 + contentSize);

// Check if we have enough data for the complete message
if (buffer.length >= 8 + contentSize) {
// Extract content
const content = buffer.subarray(8, 8 + contentSize);
// Send to appropriate stream
if (streamType === 1) {
stdout.write(content);
} else if (streamType === 2) {
stderr.write(content);
}
// Ignore other stream types

// Send to appropriate stream
if (streamType === 1) {
stdout.write(content);
} else if (streamType === 2) {
stderr.write(content);
// Remove processed message from buffer
buffer = buffer.subarray(8 + contentSize);
} else {
// Not enough data for complete message, wait for more
break;
}
}
// Ignore other stream types

// Remove processed message from buffer
buffer = buffer.subarray(8 + contentSize);
} else {
// Not enough data for complete message, wait for more
break;
callback();
} catch (error) {
callback(
error instanceof Error ? error : new Error(String(error)),
);
}
}
};
},
});
}
Loading