forked from hpchud/vccjs
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclusterwatcher.js
More file actions
191 lines (182 loc) · 6.88 KB
/
clusterwatcher.js
File metadata and controls
191 lines (182 loc) · 6.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/env node
var os = require("os");
var fs = require("fs");
var path = require("path");
var network = require("network");
var promise = require("deferred");
var child_process = require('child_process');
var glob = promise.promisify(require('glob'));
var vccutil = require("./vccutil.js");
var logger = require("./log.js");
var kvstore = require("./kvstore.js");
/**
* The ClusterWatcher
* @constructor
* @param {Object} config - A configuration object (usually loaded from vccutil.GetConfig)
*/
var ClusterWatcher = function (config) {
logger.info("ClusterWatcher initialised with config", config);
this.config = config;
this.kv = new kvstore();
this.kv.connect(config.kvstore.host, config.kvstore.port);
// host cache
this.lasthosts = {};
// on change handlers
this.on_change = [this.runClusterHooks];
// the timer id for cluster changed
this.changed_timeout = null;
// poll frequency
this.poll_ms = 5000;
// time to detect settle
this.settle_ms = 10000;
// path to hosts.vcc file
this.host_path = path.join(vccutil.getRunDir(), "hosts.vcc");
// the glob to find the cluster hooks
this.hook_dir = "/etc/vcc/cluster-hooks.d/*.sh";
}
/**
* Write the list of hosts to /etc/hosts.vcc in the same format as /etc/hosts
* @param {Object} hosts - An object consisting of host/IP key/value pairs
* @returns {Promise}
*/
ClusterWatcher.prototype.writeHosts = function (hosts) {
var deferred = promise();
// write host file in /etc/hosts format
var file = fs.createWriteStream(this.host_path);
// on error we should log it
file.on('error', function(err) {
deferred.reject(err);
});
// once open we should write file
file.once('open', function(fd) {
for (var host in hosts) {
file.write(hosts[host]+" "+host+"\n");
}
file.end();
});
// on close we should resolve the promise
file.on('close', function() {
deferred.resolve()
})
return deferred.promise;
}
/**
* Utility function to run a cluster hook with error handling and output logging
* @param {String} script - The path to the shell script to run
* @returns {Promise}
*/
ClusterWatcher.prototype.runHook = function (script) {
var deferred = promise();
var proc = child_process.spawn("/bin/sh", [script]);
// start script and resolve once it exits
proc.on('exit', function (code, signal) {
if (code > 0) {
logger.warn("hook", script, "exited with code", code);
} else {
logger.debug("hook", script, "exited with code", code);
}
deferred.resolve(code);
});
return deferred.promise;
}
/**
* Dispatch calls to runHook() in parallel, resolve once all hooks are finished
* @returns {Promise}
*/
ClusterWatcher.prototype.runClusterHooks = function () {
var me = this;
var deferred = promise();
console.log(this.hook_dir);
// use glob to find all sh scripts in the hook_dir
glob(this.hook_dir).then(function (files) {
promise.map(files, function (file) {
return me.runHook(file);
})(function (result) {
// reduce the codes
var sum = result.reduce(function (r, i) {
return r+i;
}, 0);
if (sum > 0) {
logger.warn("some hooks did not run successfully");
} else {
logger.debug("all hooks finished", result);
}
deferred.resolve(sum);
});
}, function (err) {
logger.error("could not enumerate cluster hooks");
deferred.reject(err);
});
return deferred.promise;
}
/**
* The main loop to watch the discovery KV store for changes
*/
ClusterWatcher.prototype.watchCluster = function (callback) {
var me = this;
this.kv.list("/cluster/"+this.config.cluster+"/hosts", true).then(function (hosts) {
// sort the host list
var currenthosts = Object.keys(hosts).sort().reduce(function (result, key) {
result[key] = hosts[key];
return result;
}, {});
logger.debug(Object.keys(currenthosts).length, "hosts in cluster");
// compare with lasthosts
if (JSON.stringify(currenthosts) === JSON.stringify(me.lasthosts)) {
logger.debug("cluster has not changed");
// cluster not changed, schedule next loop
me.lasthosts = currenthosts;
setTimeout(me.watchCluster.bind(me), me.poll_ms);
} else {
// cluster has changed
// see if it changed before we managed to run the hooks from the last change
// and if so, do not run the hooks from the last change
if (me.changed_timeout) {
logger.warn("cluster is not settled, changed before we ran handlers");
logger.debug("clearing existing timeout for cluster changed event");
clearTimeout(me.changed_timeout);
}
// dispatch the changed timeout event
logger.debug("cluster changed, dispatch timeout for cluster changed event");
(function (currenthosts) {
changed_timeout = setTimeout(function () {
// cluster changed handler
logger.debug("run cluster changed event");
logger.debug("writing hosts");
me.writeHosts(currenthosts).then(function () {
logger.debug("written "+me.host_path);
logger.debug("running cluster hooks now");
promise.map(me.on_change, function (handler) {
return handler.bind(me)();
})(function () {
// cluster hooks done, schedule next loop
me.lasthosts = currenthosts;
if (!callback) {
setTimeout(me.watchCluster.bind(me), me.poll_ms);
} else {
callback();
}
});
}, function (err) {
logger.error("could not write hosts.vcc", err);
});
}, me.settle_ms);
})(currenthosts);
}
}, function (err) {
logger.error("could not enumerate hosts in the cluster", err);
}).done();
}
if (require.main === module) {
var clusterwatcher = new ClusterWatcher(vccutil.getConfig());
// do a single iteration of the watch loop, and then tell the service manager
// we are ready before continuing with the watch loop
clusterwatcher.watchCluster(function () {
if (clusterwatcher.config.systemd) {
vccutil.systemdNotify("ClusterWatcher is ready", true);
}
setTimeout(clusterwatcher.watchCluster.bind(clusterwatcher), clusterwatcher.poll_ms);
});
} else {
module.exports = ClusterWatcher;
}