Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/index.restdown
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ connecting to ZK via once().
If the client driver has an unexpected error, it is sent here. Specifically,
if the client has lost its session to ZK.

### Event: 'connection_interrupted'
### Event: 'not_connected'

function onConnectionInterrupted() { }
function onNotConnected() { }

If the underlying native client loses connection to ZK, but not its session,
it is sent here. Notably, for most applications, this should be a no-op, since
Expand Down
38 changes: 30 additions & 8 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ var util = require('util');

var assert = require('assert-plus');
var vasync = require('vasync');
var once = require('once');
var ZK = require('zookeeper');

var ZKError = require('./error').ZKError;
Expand All @@ -19,8 +20,7 @@ var sprintf = util.format;
var PROXY_EVENTS = [
'connect',
'not_connected',
'close',
'session_expired'
'close'
];


Expand Down Expand Up @@ -189,13 +189,13 @@ ZKClient.prototype.close = function close() {
});

this.removeAllListeners('connect');
this.removeAllListeners('connection_interrupted');
this.removeAllListeners('session_expired');
this.removeAllListeners('not_connected');

this.zk.once('close', function () {
self.watchers.forEach(function (w) {
w.stop();
});
self.watchers = [];

self.zk.removeAllListeners('error');
self.removeAllListeners('error');
Expand Down Expand Up @@ -314,6 +314,8 @@ ZKClient.prototype.mkdirp = function mkdirp(p, callback) {
assert.string(p, 'path');
assert.func(callback, 'callback');

callback = once(callback);

var dirs = path.normalize(p).split('/').slice(1);
var log = this.log;
var self = this;
Expand All @@ -328,6 +330,7 @@ ZKClient.prototype.mkdirp = function mkdirp(p, callback) {
tasks.push(function checkIfExists(_, cb) {
log.trace('mkdirp: checking %s', dir);
self.stat(dir, function (err, _stat) {
cb = once(cb);
if (err && err.code !== ZK.ZNONODE) {
cb();
}
Expand All @@ -338,6 +341,7 @@ ZKClient.prototype.mkdirp = function mkdirp(p, callback) {
});

tasks.push(function mkdirIfNotExists(_, cb) {
cb = once(cb);
if (exists) {
cb();
} else {
Expand Down Expand Up @@ -368,12 +372,15 @@ ZKClient.prototype.put = function put(p, object, options, callback) {
assert.object(options, 'options');
assert.func(callback, 'callback');

callback = once(callback);

var exists;
var log = this.log;
var _p = path.normalize(p);
var self = this;
var tasks = [
function checkIfExists(_, cb) {
cb = once(cb);
log.trace('put: checking %s', _p);
self.stat(_p, function (err, _stat) {
if (err && err.code !== ZK.ZNONODE) {
Expand All @@ -386,6 +393,7 @@ ZKClient.prototype.put = function put(p, object, options, callback) {
},

function putIfNotExists(_, cb) {
cb = once(cb);
if (exists) {
cb();
} else {
Expand All @@ -395,7 +403,7 @@ ZKClient.prototype.put = function put(p, object, options, callback) {
},

function set(_, cb) {
self.update(_p, object, cb);
self.update(_p, object, once(cb));
}
];

Expand Down Expand Up @@ -424,6 +432,8 @@ ZKClient.prototype.readdir = function readdir(p, callback) {
var _p = path.normalize(p);
var zk = this.zk;

callback = once(callback);

log.trace({path: p}, 'readdir: entered');
zk.a_get_children(_p, false, function (rc, msg, nodes) {
if (rc !== 0) {
Expand All @@ -442,6 +452,8 @@ ZKClient.prototype.rmr = function rmr(p, callback) {
assert.string(p, 'path');
assert.func(callback, 'callback');

callback = once(callback);

var _done = false;
var inflight = 0;
var log = this.log;
Expand Down Expand Up @@ -489,7 +501,7 @@ ZKClient.prototype.rmr = function rmr(p, callback) {

nodes.forEach(function (n) {
tasks.push(function (_, cb) {
self.unlink(n, cb);
self.unlink(n, once(cb));
});
});

Expand All @@ -505,6 +517,8 @@ ZKClient.prototype.stat = function stat(p, callback) {
assert.string(p, 'path');
assert.func(callback, 'callback');

callback = once(callback);

var log = this.log;
var zk = this.zk;

Expand Down Expand Up @@ -542,6 +556,8 @@ ZKClient.prototype.unlink = function unlink(p, options, callback) {
assert.object(options, 'options');
assert.func(callback, 'callback');

callback = once(callback);

var log = this.log;
var _p = path.normalize(p);
var zk = this.zk;
Expand Down Expand Up @@ -578,12 +594,15 @@ ZKClient.prototype.update = function update(p, object, options, callback) {
assert.object(options, 'options');
assert.func(callback, 'callback');

callback = once(callback);

var data = JSON.stringify(object);
var log = this.log;
var _p = path.normalize(p);
var self = this;
var tasks = [
function getVersion(_, cb) {
cb = once(cb);
if (version !== undefined)
return (cb());
self.stat(_p, function (err, _stat) {
Expand All @@ -597,6 +616,7 @@ ZKClient.prototype.update = function update(p, object, options, callback) {
return (undefined);
},
function write(_, cb) {
cb = once(cb);
zk.a_set(_p, data, version, function (rc, msg) {
if (rc !== 0) {
cb(new ZKError(rc, msg));
Expand Down Expand Up @@ -634,6 +654,8 @@ ZKClient.prototype.watch = function watch(p, options, callback) {
assert.object(options, 'options');
assert.func(callback, 'callback');

callback = once(callback);

var log = this.log;
var _p = path.normalize(p);
var self = this;
Expand Down Expand Up @@ -738,7 +760,7 @@ ZKClient.prototype.watch = function watch(p, options, callback) {
} catch (e) {
log.error({err: e},
'error while parsing data', e);
return (self.emit('error', e));
return (emitter.emit('error', e));
}

log.trace({
Expand All @@ -759,7 +781,7 @@ ZKClient.prototype.watch = function watch(p, options, callback) {
path: _p
}, 'watch: notification fired');

if (!done) {
if (!done && event != "session" && event != "unknown") {
emitter.emit(event);
register();
}
Expand Down
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
"main": "./lib/index.js",
"dependencies": {
"assert-plus": "0.1.2",
"bunyan": "0.16.5",
"bunyan": "0.21.1",
"node-uuid": "1.4.0",
"vasync": "1.3.2",
"zookeeper": "git://github.com/yunong/node-zookeeper.git#3a0545d"
"once": "1.1.1",
"vasync": "1.3.3",
"zookeeper": "git://github.com/yunong/node-zookeeper.git#ef48f8c24b42d6c1b1c3dbab82278666bed37b70"
},
"devDependencies": {
"cover": "0.2.8",
Expand Down