diff --git a/CHANGELOG.md b/CHANGELOG.md index 850c9c5..cdd0b84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changes +## v0.6.0 + +* Remove `close()`d sources from the cache. + ## v0.5.2 - 2/16/16 * Fix cache handling for items w/o `.length` (@petrsloup) diff --git a/README.md b/README.md index b86c44a..2cdf607 100644 --- a/README.md +++ b/README.md @@ -10,12 +10,9 @@ and data produced by their respective `getTile()` functions. var tilelive = require("tilelive"), cache = require("tilelive-cache")(tilelive, { size: 10, // 10MB cache (the default) - sources: 6, // cache a maximum of 6 sources (the default); you may + sources: 6 // cache a maximum of 6 sources (the default); you may // need to change this if you're using lots of // composed sources - closeDelay: 30 // wait 30s (the default) before closing sources when - // they've been evicted; this should prevent drain errors in - // tilelive-mapnik }); // ... diff --git a/index.js b/index.js index 0d6f681..030e4f3 100644 --- a/index.js +++ b/index.js @@ -70,134 +70,148 @@ var getContextCallbackProperties = function(context) { return properties; }; -var enableCaching = function(uri, source, locker) { - // TODO use ES6 Symbols to prevent collisions - if (source._cached) { - // already cached +module.exports = function(tilelive, options) { + tilelive = enableStreaming(tilelive); - return source; - } + options = options || {}; + options.size = "size" in options && options.size !== undefined ? options.size : 10; + options.sources = (options.sources | 0) || 6; + + // defined outside enableCaching so that a single, shared cache will be used + // (this requires that keys be namespaced appropriately) + var locker = lockingCache({ + max: 1024 * 1024 * options.size, // convert to MB + length: function(val) { + return (val[0] && val[0].length) ? val[0].length : 1; + }, + maxAge: 6 * 3600e3 // 6 hours + }); + + var cache = Object.create(tilelive); - var uriSha1 = crypto.createHash("sha1"); - uriSha1.update(JSON.stringify(uri)); - var uriHash = uriSha1.digest("hex"); + var lockedLoad = lockingCache({ + max: options.sources, + dispose: function(key, values) { + // the source will always be the first value since it's the first + // argument to unlock() + values[0].close(); + } + }); + + var enableCaching = function(source, sourceKey) { + // TODO use ES6 Symbols to prevent collisions + if (source._cached) { + // already cached - var makeKey = function(name, properties) { - properties = properties || {}; + return source; + } - var key = util.format("%s:%s@%j", name, uriHash, properties); + var makeKey = function(name, properties) { + properties = properties || {}; - // glue on any additional arguments using their JSON representation - key += Array.prototype.slice.call(arguments, 2).map(JSON.stringify).join(","); + var key = util.format("%s:%s@%j", name, sourceKey, properties); - var sha1 = crypto.createHash("sha1"); - sha1.update(key); - return sha1.digest("hex"); - }; + // glue on any additional arguments using their JSON representation + key += Array.prototype.slice.call(arguments, 2).map(JSON.stringify).join(","); - if (source.getTile) { - var _getTile = source.getTile.bind(source); - - source.getTile = locker(function(z, x, y, lock) { - var properties = getContextCallbackProperties(this); - - // lock neighboring tiles when metatiling (if the source is streamable) - if (source.pipe && source.metatile && source.metatile > 1) { - // TODO extract this (also used in tilelive-streaming) - // get neighboring tiles within the same metatile - var dx = x % source.metatile, - dy = y % source.metatile, - metaX = x - dx, - metaY = y - dy; - - for (var ix = metaX; ix < metaX + source.metatile; ix++) { - for (var iy = metaY; iy < metaY + source.metatile; iy++) { - // ignore the current tile - if (!(ix === x && iy === y)) { - var key = makeKey("getTile", properties, z, ix, iy); - - if (!locker.locks.get(key)) { - // lock it with an empty list of callbacks (nothing to notify) - locker.locks.set(key, []); + var sha1 = crypto.createHash("sha1"); + sha1.update(key); + return sha1.digest("hex"); + }; + + if (source.getTile) { + var _getTile = source.getTile.bind(source); + + source.getTile = locker(function(z, x, y, lock) { + var properties = getContextCallbackProperties(this); + + // lock neighboring tiles when metatiling (if the source is streamable) + if (source.pipe && source.metatile && source.metatile > 1) { + // TODO extract this (also used in tilelive-streaming) + // get neighboring tiles within the same metatile + var dx = x % source.metatile, + dy = y % source.metatile, + metaX = x - dx, + metaY = y - dy; + + for (var ix = metaX; ix < metaX + source.metatile; ix++) { + for (var iy = metaY; iy < metaY + source.metatile; iy++) { + // ignore the current tile + if (!(ix === x && iy === y)) { + var key = makeKey("getTile", properties, z, ix, iy); + + if (!locker.locks.get(key)) { + // lock it with an empty list of callbacks (nothing to notify) + locker.locks.set(key, []); + } } } } } - } - return lock(makeKey("getTile", properties, z, x, y), function(unlock) { - return _getTile(z, x, y, unlock); - }); - }).bind(source); - } + return lock(makeKey("getTile", properties, z, x, y), function(unlock) { + return _getTile(z, x, y, unlock); + }); + }).bind(source); + } - if (source.getGrid) { - var _getGrid = source.getGrid.bind(source); + if (source.getGrid) { + var _getGrid = source.getGrid.bind(source); - source.getGrid = locker(function(z, x, y, lock) { - return lock(makeKey("getGrid", getContextCallbackProperties(this), z, x, y), function(unlock) { - return _getGrid(z, x, y, unlock); - }); - }).bind(source); - } + source.getGrid = locker(function(z, x, y, lock) { + return lock(makeKey("getGrid", getContextCallbackProperties(this), z, x, y), function(unlock) { + return _getGrid(z, x, y, unlock); + }); + }).bind(source); + } - if (source.getInfo) { - var _getInfo = source.getInfo.bind(source); + if (source.getInfo) { + var _getInfo = source.getInfo.bind(source); - source.getInfo = locker(function(lock) { - return lock(makeKey("getInfo", getContextCallbackProperties(this)), function(unlock) { - return _getInfo(unlock); - }); - }).bind(source); - } + source.getInfo = locker(function(lock) { + return lock(makeKey("getInfo", getContextCallbackProperties(this)), function(unlock) { + return _getInfo(unlock); + }); + }).bind(source); + } - var target = new stream.Writable({ - objectMode: true, - highWaterMark: 16 // arbitrary backlog sizing - }); + var _close = function(callback) { + return callback(); + }; - target._write = function(tile, _, callback) { - tile.pipe(new CacheCollector(locker, makeKey).on("finish", callback)); - }; + if (source.close) { + _close = source.close.bind(source); + } - source.pipe(target); + source.close = function(callback) { + callback = callback || function() {}; - source._cached = true; + // remove this from the source cache + if (lockedLoad.cache.has(sourceKey)) { + // queue deletion (otherwise the cache's dispose calls this) + setImmediate(function() { + lockedLoad.cache.del(sourceKey); + }); + } - return source; -}; + return _close(callback); + } -module.exports = function(tilelive, options) { - tilelive = enableStreaming(tilelive); + var target = new stream.Writable({ + objectMode: true, + highWaterMark: 16 // arbitrary backlog sizing + }); - options = options || {}; - options.size = "size" in options && options.size !== undefined ? options.size : 10; - options.sources = (options.sources | 0) || 6; + target._write = function(tile, _, callback) { + tile.pipe(new CacheCollector(locker, makeKey).on("finish", callback)); + }; - // defined outside enableCaching so that a single, shared cache will be used - // (this requires that keys be namespaced appropriately) - var locker = lockingCache({ - max: 1024 * 1024 * options.size, // convert to MB - length: function(val) { - return (val[0] && val[0].length) ? val[0].length : 1; - }, - maxAge: 6 * 3600e3 // 6 hours - }); + source.pipe(target); - var cache = Object.create(tilelive); + source._cached = true; - var lockedLoad = lockingCache({ - max: options.sources, - dispose: function(key, values) { - // don't close the source immediately in case there are pending - // references to it that haven't requested tiles yet - setTimeout(function() { - // the source will always be the first value since it's the first - // argument to unlock() - values[0].close(function() {}); - }, (options.closeDelay || 30) * 1000); - } - }); + return source; + }; cache.load = lockedLoad(function(uri, lock) { if (typeof uri === "string") { @@ -215,16 +229,14 @@ module.exports = function(tilelive, options) { console.warn(err.stack); } - var sha1 = crypto.createHash("sha1"); - sha1.update(JSON.stringify(uri)); - var key = sha1.digest("hex"); + var key = crypto.createHash("sha1").update(JSON.stringify(uri)).digest("hex"); return lock(key, function(unlock) { return tilelive.load(uri, function(err, source) { if (!err && options.size > 0 && useCache) { - source = enableCaching(uri, source, locker); + source = enableCaching(source, key); } return unlock(err, source);