Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
var s3urls = require('@mapbox/s3urls');
var s3urls = require('./lib/s3url-parser');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a deprecated library so I just implemented the necessary functions in s3url-parser

var Split = require('split');
var List = require('./lib/keys');
var Get = require('./lib/get');
Expand Down
23 changes: 13 additions & 10 deletions lib/copy.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const AWS = require('aws-sdk');
const { S3Client, CopyObjectCommand } = require('@aws-sdk/client-s3');
const AwsError = require('./error');
const parallel = require('parallel-stream');

Expand All @@ -11,12 +11,12 @@ module.exports = function(fromBucket, toBucket, keyTransform, options) {
}

var s3config = {
maxRetries: 10,
httpOptions: { connectTimeout: 3000 }
maxAttempts: 10,
requestTimeout: 3000
};
if (options.logger) s3config.logger = options.logger;
if (options.agent) s3config.httpOptions.agent = options.agent;
var s3 = options.s3 || new AWS.S3(s3config);
if (options.agent) s3config.requestHandler = { httpsAgent: options.agent };
var s3 = options.s3 || new S3Client(s3config);

function write(key, enc, callback) {
key = key.toString().trim();
Expand All @@ -28,11 +28,14 @@ module.exports = function(fromBucket, toBucket, keyTransform, options) {
CopySource: [fromBucket, key].join('/')
};

s3.copyObject(copyParams, function(err) {
if (err) return callback(AwsError(err, copyParams));
copyStream.copied++;
callback();
});
s3.send(new CopyObjectCommand(copyParams))
.then(function() {
copyStream.copied++;
callback();
})
.catch(function(err) {
callback(AwsError(err, copyParams));
});
}

var starttime = Date.now();
Expand Down
33 changes: 26 additions & 7 deletions lib/delete.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
const AWS = require('aws-sdk');
const { S3Client, DeleteObjectCommand, S3ServiceException} = require('@aws-sdk/client-s3');

const { NodeHttpHandler } = require('@smithy/node-http-handler');
const { Agent } = require('http');
const awsError = require('./error.js');
const parallel = require('parallel-stream');

module.exports = function(bucket, options) {
options = options || {};

const s3config = {
maxRetries: 10,
httpOptions: { connectTimeout: 3000 }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connectTimeout and requestTimeout would not be the same thing. Please refer to here for example https://github.com/mapbox/aws-logs/blob/main/cdk/lib/forwarders/clients/s3.ts#L39-L43

maxAttempts: 11, // Allows for 10 retries + 1 initial attempt
requestTimeout: 3000,
requestHandler: new NodeHttpHandler({
httpAgent: new Agent({}),
connectionTimeout: 20 * 1000,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we keep this 3000 like before?

requestTimeout: 900 * 1000,
}),
};
if (options.logger) s3config.logger = options.logger;
if (options.agent) s3config.httpOptions.agent = options.agent;
const s3 = options.s3 || new AWS.S3(s3config);
if (options.agent) s3config.requestHandler = { httpsAgent: options.agent };
const s3 = options.s3 || new S3Client(s3config);

function write(key, enc, callback) {
key = key.toString();
Expand All @@ -23,14 +31,25 @@ module.exports = function(bucket, options) {
};

function removed(err) {
if (err && err.statusCode !== 404) return callback(awsError(err, params));
if (err && err instanceof S3ServiceException && (
err.name === 'NoSuchBucket' ||
err.name === 'NoSuchKey' ||
err.name === 'AccessDenied'
)) return callback(awsError(err, params));
deleteStream.deleted++;
deleteStream.emit('deleted', key);
callback();
}

if (options.dryrun) return removed();
s3.deleteObject(params, removed);

s3.send(new DeleteObjectCommand(params))
.then(function() {
removed();
})
.catch(function(err) {
removed(err);
});
}

var starttime = Date.now();
Expand Down
101 changes: 66 additions & 35 deletions lib/get.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
const AWS = require('aws-sdk');
const { S3Client, GetObjectCommand } = require('@aws-sdk/client-s3');
const AwsError = require('./error');
const zlib = require('zlib');
const parallel = require('parallel-stream');
const {NodeHttpHandler} = require("@smithy/node-http-handler");
const {Agent} = require("http");

module.exports = function(bucket, options) {
options = options || {};
Expand All @@ -12,13 +14,23 @@ module.exports = function(bucket, options) {
if (options.body && options.passErrors)
throw new Error('options.body cannot be used with options.passErrors');

const s3config = {
maxRetries: 10,
httpOptions: { connectTimeout: 3000 }
};
if (options.logger) s3config.logger = options.logger;
if (options.agent) s3config.httpOptions.agent = options.agent;
const s3 = options.s3 || new AWS.S3(s3config);
let s3;
if (options.s3) {
s3 = options.s3;
} else {
const s3config = {
maxAttempts: 11, // Allows for 10 retries + 1 initial attempt
requestTimeout: 3000,
requestHandler: new NodeHttpHandler({
httpAgent: new Agent({}),
connectionTimeout: 20 * 1000,
requestTimeout: 900 * 1000,
}),
};
if (options.logger) s3config.logger = options.logger;
if (options.agent) s3config.requestHandler = { httpsAgent: options.agent };
s3 = new S3Client(s3config);
}

function transform(key, enc, callback) {
key = key.toString();
Expand All @@ -29,37 +41,56 @@ module.exports = function(bucket, options) {
Key: key.toString()
};

s3.getObject(params, function (err, data) {
if (!options.passErrors && err && err.statusCode === 404) return callback();
if (!options.passErrors && err) return callback(AwsError(err, params));
s3.send(new GetObjectCommand(params))
.then(function(data) {
var response = data;
if (options.keys) response.RequestParameters = params;

// Convert stream to buffer for compatibility
if (data.Body && typeof data.Body.transformToByteArray === 'function') {
data.Body.transformToByteArray()
.then(function(bodyBytes) {
response.Body = Buffer.from(bodyBytes);

var response = err || data;
if (options.keys) response.RequestParameters = params;
if (options.gunzip) {
zlib.gunzip(response.Body, function(err, gunzipped) {
if (err) return getStream.emit('error', AwsError(err, params));
response.Body = gunzipped;
getStream.got++;
callback(null, options.body ? response.Body : response);
});
} else {
getStream.got++;
callback(null, options.body ? response.Body : response);
}
})
.catch(function(err) {
callback(AwsError(err, params));
});
} else {
// Handle case where Body is already a buffer
if (options.gunzip) {
zlib.gunzip(response.Body, function(err, gunzipped) {
if (err) return getStream.emit('error', AwsError(err, params));
response.Body = gunzipped;
getStream.got++;
callback(null, options.body ? response.Body : response);
});
} else {
getStream.got++;
callback(null, options.body ? response.Body : response);
}
}
})
.catch(function(err) {
if (!options.passErrors && err.name === 'NoSuchKey') return callback();
if (!options.passErrors && err) return callback(AwsError(err, params));

if (options.gunzip) {
zlib.gunzip(response.Body, function(err, gunzipped) {
if (err) return getStream.emit('error', AwsError(err, params));
response.Body = gunzipped;
getStream.got++;
callback(null, options.body ? response.Body : response);
});
} else {
var response = err;
if (options.keys) response.RequestParameters = params;
getStream.got++;
callback(null, options.body ? response.Body : response);
}
}).on('extractData', function (res) {
if (res.data.Body.length !== Number(res.data.ContentLength)) {
res.data = null;
res.error = {
code: 'TruncatedResponseError',
message: 'Content-Length does not match response body length'
};
}
}).on('retry', function (res) {
if (res.error) {
if (res.error.code === 'TruncatedResponseError') res.error.retryable = true;
}
});
});
}

var starttime = Date.now();
Expand Down
71 changes: 35 additions & 36 deletions lib/keys.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,30 @@
const AWS = require('aws-sdk');
const { S3Client, ListObjectsCommand } = require('@aws-sdk/client-s3');
const stream = require('stream');
const s3urls = require('@mapbox/s3urls');
const s3urls = require('./s3url-parser');
const awsError = require('./error.js');
const {NodeHttpHandler} = require("@smithy/node-http-handler");
const {Agent} = require("http");

module.exports = function(s3url, options) {
options = options || {};

const s3config = {
maxRetries: 10,
httpOptions: { connectTimeout: 3000 }
};
if (options.logger) s3config.logger = options.logger;
if (options.agent) s3config.httpOptions.agent = options.agent;
const s3 = options.s3 || new AWS.S3(s3config);
let s3;
if (options.s3) {
s3 = options.s3;
} else {
const s3config = {
maxAttempts: 11, // Allows for 10 retries + 1 initial attempt
requestTimeout: 3000,
requestHandler: new NodeHttpHandler({
httpAgent: new Agent({}),
connectionTimeout: 20 * 1000,
requestTimeout: 900 * 1000,
}),
};
if (options.logger) s3config.logger = options.logger;
if (options.agent) s3config.requestHandler = { httpsAgent: options.agent };
s3 = new S3Client(s3config);
}

s3url = s3urls.fromUrl(s3url);

Expand Down Expand Up @@ -51,37 +63,24 @@ module.exports = function(s3url, options) {

if (keyStream.next) params.Marker = keyStream.next;

s3.listObjects(params, function (err, data) {
if (err) return keyStream.emit('error', awsError(err, params));
s3.send(new ListObjectsCommand(params))
.then(function(data) {
var last = data.Contents.slice(-1)[0];
var more = data.IsTruncated && last;

if (!Array.isArray(data.Contents)) {
var error = new Error('Invalid SDK response');
error.body = this.httpResponse.body;
error.headers = this.httpResponse.headers;
error.statusCode = this.httpResponse.statusCode;
error.requestId = this.requestId;
return keyStream.emit('error', error);
}
data.Contents.forEach(function(item) {
keyStream.cache.push(item.Key);
});

var last = data.Contents.slice(-1)[0];
var more = data.IsTruncated && last;
keyStream.readPending = false;

data.Contents.forEach(function(item) {
keyStream.cache.push(item.Key);
if (more) keyStream.next = last.Key;
else keyStream.done = true;
keyStream._read();
})
.catch(function(err) {
keyStream.emit('error', awsError(err, params));
});

keyStream.readPending = false;

if (more) keyStream.next = last.Key;
else keyStream.done = true;
keyStream._read();
}).on('httpDone', function(response) {
if (response.httpResponse.statusCode === 200 && !response.httpResponse.body.length) {
response.error = new Error('S3 API response contained no body');
response.error.retryable = true;
response.error.requestId = this.requestId;
}
});
};

return keyStream;
Expand Down
39 changes: 39 additions & 0 deletions lib/s3url-parser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Simple S3 URL parser to replace @mapbox/s3urls dependency
* This avoids the transitive dependency on aws-sdk v2 through @mapbox/s3signed
*/

function fromUrl(s3url) {
if (!s3url || typeof s3url !== 'string') {
throw new Error('Invalid S3 URL');
}

// Remove s3:// prefix
if (!s3url.startsWith('s3://')) {
throw new Error('URL must start with s3://');
}

const urlWithoutProtocol = s3url.slice(5); // Remove 's3://'
const firstSlashIndex = urlWithoutProtocol.indexOf('/');

let bucket, key;

if (firstSlashIndex === -1) {
// No slash found, entire string is bucket name
bucket = urlWithoutProtocol;
key = '';
} else {
// Split at first slash
bucket = urlWithoutProtocol.slice(0, firstSlashIndex);
key = urlWithoutProtocol.slice(firstSlashIndex + 1);
}

return {
Bucket: bucket,
Key: key
};
}

module.exports = {
fromUrl: fromUrl
};
Loading