Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## v0.6.0

* Remove `close()`d sources from the cache.

## v0.5.2 - 2/16/16

* Fix cache handling for items w/o `.length` (@petrsloup)
Expand Down
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@ and data produced by their respective `getTile()` functions.
var tilelive = require("tilelive"),
cache = require("tilelive-cache")(tilelive, {
size: 10, // 10MB cache (the default)
sources: 6, // cache a maximum of 6 sources (the default); you may
sources: 6 // cache a maximum of 6 sources (the default); you may
// need to change this if you're using lots of
// composed sources
closeDelay: 30 // wait 30s (the default) before closing sources when
// they've been evicted; this should prevent drain errors in
// tilelive-mapnik
});

// ...
Expand Down
224 changes: 118 additions & 106 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,134 +70,148 @@ var getContextCallbackProperties = function(context) {
return properties;
};

var enableCaching = function(uri, source, locker) {
// TODO use ES6 Symbols to prevent collisions
if (source._cached) {
// already cached
module.exports = function(tilelive, options) {
tilelive = enableStreaming(tilelive);

return source;
}
options = options || {};
options.size = "size" in options && options.size !== undefined ? options.size : 10;
options.sources = (options.sources | 0) || 6;

// defined outside enableCaching so that a single, shared cache will be used
// (this requires that keys be namespaced appropriately)
var locker = lockingCache({
max: 1024 * 1024 * options.size, // convert to MB
length: function(val) {
return (val[0] && val[0].length) ? val[0].length : 1;
},
maxAge: 6 * 3600e3 // 6 hours
});

var cache = Object.create(tilelive);

var uriSha1 = crypto.createHash("sha1");
uriSha1.update(JSON.stringify(uri));
var uriHash = uriSha1.digest("hex");
var lockedLoad = lockingCache({
max: options.sources,
dispose: function(key, values) {
// the source will always be the first value since it's the first
// argument to unlock()
values[0].close();
}
});

var enableCaching = function(source, sourceKey) {
// TODO use ES6 Symbols to prevent collisions
if (source._cached) {
// already cached

var makeKey = function(name, properties) {
properties = properties || {};
return source;
}

var key = util.format("%s:%s@%j", name, uriHash, properties);
var makeKey = function(name, properties) {
properties = properties || {};

// glue on any additional arguments using their JSON representation
key += Array.prototype.slice.call(arguments, 2).map(JSON.stringify).join(",");
var key = util.format("%s:%s@%j", name, sourceKey, properties);

var sha1 = crypto.createHash("sha1");
sha1.update(key);
return sha1.digest("hex");
};
// glue on any additional arguments using their JSON representation
key += Array.prototype.slice.call(arguments, 2).map(JSON.stringify).join(",");

if (source.getTile) {
var _getTile = source.getTile.bind(source);

source.getTile = locker(function(z, x, y, lock) {
var properties = getContextCallbackProperties(this);

// lock neighboring tiles when metatiling (if the source is streamable)
if (source.pipe && source.metatile && source.metatile > 1) {
// TODO extract this (also used in tilelive-streaming)
// get neighboring tiles within the same metatile
var dx = x % source.metatile,
dy = y % source.metatile,
metaX = x - dx,
metaY = y - dy;

for (var ix = metaX; ix < metaX + source.metatile; ix++) {
for (var iy = metaY; iy < metaY + source.metatile; iy++) {
// ignore the current tile
if (!(ix === x && iy === y)) {
var key = makeKey("getTile", properties, z, ix, iy);

if (!locker.locks.get(key)) {
// lock it with an empty list of callbacks (nothing to notify)
locker.locks.set(key, []);
var sha1 = crypto.createHash("sha1");
sha1.update(key);
return sha1.digest("hex");
};

if (source.getTile) {
var _getTile = source.getTile.bind(source);

source.getTile = locker(function(z, x, y, lock) {
var properties = getContextCallbackProperties(this);

// lock neighboring tiles when metatiling (if the source is streamable)
if (source.pipe && source.metatile && source.metatile > 1) {
// TODO extract this (also used in tilelive-streaming)
// get neighboring tiles within the same metatile
var dx = x % source.metatile,
dy = y % source.metatile,
metaX = x - dx,
metaY = y - dy;

for (var ix = metaX; ix < metaX + source.metatile; ix++) {
for (var iy = metaY; iy < metaY + source.metatile; iy++) {
// ignore the current tile
if (!(ix === x && iy === y)) {
var key = makeKey("getTile", properties, z, ix, iy);

if (!locker.locks.get(key)) {
// lock it with an empty list of callbacks (nothing to notify)
locker.locks.set(key, []);
}
}
}
}
}
}

return lock(makeKey("getTile", properties, z, x, y), function(unlock) {
return _getTile(z, x, y, unlock);
});
}).bind(source);
}
return lock(makeKey("getTile", properties, z, x, y), function(unlock) {
return _getTile(z, x, y, unlock);
});
}).bind(source);
}

if (source.getGrid) {
var _getGrid = source.getGrid.bind(source);
if (source.getGrid) {
var _getGrid = source.getGrid.bind(source);

source.getGrid = locker(function(z, x, y, lock) {
return lock(makeKey("getGrid", getContextCallbackProperties(this), z, x, y), function(unlock) {
return _getGrid(z, x, y, unlock);
});
}).bind(source);
}
source.getGrid = locker(function(z, x, y, lock) {
return lock(makeKey("getGrid", getContextCallbackProperties(this), z, x, y), function(unlock) {
return _getGrid(z, x, y, unlock);
});
}).bind(source);
}

if (source.getInfo) {
var _getInfo = source.getInfo.bind(source);
if (source.getInfo) {
var _getInfo = source.getInfo.bind(source);

source.getInfo = locker(function(lock) {
return lock(makeKey("getInfo", getContextCallbackProperties(this)), function(unlock) {
return _getInfo(unlock);
});
}).bind(source);
}
source.getInfo = locker(function(lock) {
return lock(makeKey("getInfo", getContextCallbackProperties(this)), function(unlock) {
return _getInfo(unlock);
});
}).bind(source);
}

var target = new stream.Writable({
objectMode: true,
highWaterMark: 16 // arbitrary backlog sizing
});
var _close = function(callback) {
return callback();
};

target._write = function(tile, _, callback) {
tile.pipe(new CacheCollector(locker, makeKey).on("finish", callback));
};
if (source.close) {
_close = source.close.bind(source);
}

source.pipe(target);
source.close = function(callback) {
callback = callback || function() {};

source._cached = true;
// remove this from the source cache
if (lockedLoad.cache.has(sourceKey)) {
// queue deletion (otherwise the cache's dispose calls this)
setImmediate(function() {
lockedLoad.cache.del(sourceKey);
});
}

return source;
};
return _close(callback);
}

module.exports = function(tilelive, options) {
tilelive = enableStreaming(tilelive);
var target = new stream.Writable({
objectMode: true,
highWaterMark: 16 // arbitrary backlog sizing
});

options = options || {};
options.size = "size" in options && options.size !== undefined ? options.size : 10;
options.sources = (options.sources | 0) || 6;
target._write = function(tile, _, callback) {
tile.pipe(new CacheCollector(locker, makeKey).on("finish", callback));
};

// defined outside enableCaching so that a single, shared cache will be used
// (this requires that keys be namespaced appropriately)
var locker = lockingCache({
max: 1024 * 1024 * options.size, // convert to MB
length: function(val) {
return (val[0] && val[0].length) ? val[0].length : 1;
},
maxAge: 6 * 3600e3 // 6 hours
});
source.pipe(target);

var cache = Object.create(tilelive);
source._cached = true;

var lockedLoad = lockingCache({
max: options.sources,
dispose: function(key, values) {
// don't close the source immediately in case there are pending
// references to it that haven't requested tiles yet
setTimeout(function() {
// the source will always be the first value since it's the first
// argument to unlock()
values[0].close(function() {});
}, (options.closeDelay || 30) * 1000);
}
});
return source;
};

cache.load = lockedLoad(function(uri, lock) {
if (typeof uri === "string") {
Expand All @@ -215,16 +229,14 @@ module.exports = function(tilelive, options) {
console.warn(err.stack);
}

var sha1 = crypto.createHash("sha1");
sha1.update(JSON.stringify(uri));
var key = sha1.digest("hex");
var key = crypto.createHash("sha1").update(JSON.stringify(uri)).digest("hex");

return lock(key, function(unlock) {
return tilelive.load(uri, function(err, source) {
if (!err &&
options.size > 0 &&
useCache) {
source = enableCaching(uri, source, locker);
source = enableCaching(source, key);
}

return unlock(err, source);
Expand Down