Skip to content

Commit 21143fb

Browse files
author
Jackie Barbetta
committed
2 parents d9dd2cd + c5a51a6 commit 21143fb

7 files changed

Lines changed: 451 additions & 7 deletions

File tree

lib/Accumulable.js

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2015 IBM Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
var Utils = require('./utils.js');
18+
19+
/**
20+
* A data type that can be accumulated, ie has an commutative and associative "add" operation,
21+
* but where the result type, `R`, may be different from the element type being added, `T`.
22+
*
23+
* You must define how to add data, and how to merge two of these together. For some data types,
24+
* such as a counter, these might be the same operation. In that case, you can use the simpler
25+
* {@link Accumulator}. They won't always be the same, though -- e.g., imagine you are
26+
* accumulating a set. You will add items to the set, and you will union two sets together.
27+
*
28+
* @classdesc
29+
* @constructor
30+
*/
31+
/*
32+
* NOTE for now EclairJS will only support floats and int types
33+
*
34+
*
35+
*/
36+
function Accumulable(kernelP, refIdP) {
37+
this.kernelP = kernelP;
38+
this.refIdP = refIdP;
39+
}
40+
41+
/**
42+
* Add more data to this accumulator / accumulable
43+
* @param {object} term the data to add
44+
* @returns {Promise.<Void>} A Promise that resolves to nothing.
45+
*/
46+
Accumulable.prototype.add = function(term) {
47+
var templateStr = '{{inRefId}}.add({{term}});';
48+
return Utils.generateVoidPromise(this, templateStr, {term: Utils.prepForReplacement(term)});
49+
};
50+
51+
52+
/**
53+
* Merge two accumulable objects together
54+
*
55+
* Normally, a user will not want to use this version, but will instead call `add`.
56+
* @param {object} term the other `R` that will get merged with this
57+
* @returns {Promise.<Void>} A Promise that resolves to nothing.
58+
*/
59+
Accumulable.prototype.merge = function(term) {
60+
var templateStr = '{{inRefId}}.merge({{term}});';
61+
return Utils.generateVoidPromise(this, templateStr, {term: Utils.prepForReplacement(term)});
62+
};
63+
64+
/**
65+
* Access the accumulator's current value; only allowed on master.
66+
* @returns {Promise.<object>}
67+
*/
68+
Accumulable.prototype.value = function() {
69+
function _resolve(result, resolve, reject) {
70+
resolve(parseFloat(result));
71+
}
72+
73+
var templateStr = '{{inRefId}}.value();';
74+
75+
return Utils.generateResultPromise(this, templateStr, null, _resolve);
76+
};
77+
78+
/**
79+
* Get the current value of this accumulator from within a task.
80+
*
81+
* This is NOT the global value of the accumulator. To get the global value after a
82+
* completed operation on the dataset, call `value`.
83+
*
84+
* The typical use of this method is to directly mutate the local value, eg., to add
85+
* an element to a Set.
86+
* @returns {Promise.<object>}
87+
*/
88+
Accumulable.prototype.localValue = function() {
89+
function _resolve(result, resolve, reject) {
90+
resolve(parseFloat(result));
91+
}
92+
93+
var templateStr = '{{inRefId}}.localValue();';
94+
95+
return Utils.generateResultPromise(this, templateStr, null, _resolve);
96+
};
97+
98+
/**
99+
* Set the accumulator's value; only allowed on master
100+
* @param {object}
101+
* @returns {Promise.<Void>} A Promise that resolves to nothing.
102+
*/
103+
Accumulable.prototype.setValue = function(newValue) {
104+
var templateStr = '{{inRefId}}.setValue({{newValue}});';
105+
106+
return Utils.generateVoidPromise(this, templateStr , {newValue: Utils.prepForReplacement(newValue)});
107+
};
108+
109+
/**
110+
* @returns {Promise.<string>}
111+
*/
112+
Accumulable.prototype.toString = function() {
113+
var templateStr = '{{inRefId}}.toString();';
114+
return Utils.generateResultPromise(this, templateStr);
115+
};
116+
117+
module.exports = Accumulable;

lib/AccumulableParam.js

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2015 IBM Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
var Utils = require('./utils.js');
18+
19+
var kernelP;
20+
21+
/**
22+
* Helper object defining how to accumulate values of a particular type. An implicit
23+
* AccumulableParam needs to be available when you create {@link Accumulable}s of a specific type.
24+
*
25+
* @classdesc
26+
* @constructor
27+
*/
28+
function AccumulableParam() {
29+
}
30+
31+
/**
32+
* Add additional data to the accumulator value. Is allowed to modify and return `r`
33+
* for efficiency (to avoid allocating objects).
34+
*
35+
* @param {object} r the current value of the accumulator
36+
* @param {object} t the data to be added to the accumulator
37+
* @returns {object} the new value of the accumulator
38+
* @returns {Promise.<object>} Promise that resolves to the new value of the accumulator
39+
*/
40+
AccumulableParam.prototype.addAccumulator = function(r, t) {
41+
var templateStr = '{{inRefId}}.addAccumulator({{r}}, {{t}});';
42+
43+
return Utils.generateResultPromise(this, templateStr, {r: r, t: t});
44+
};
45+
46+
/**
47+
* Merge two accumulated values together. Is allowed to modify and return the first value
48+
* for efficiency (to avoid allocating objects).
49+
*
50+
* @param {object} r1 one set of accumulated data
51+
* @param {object} r2 another set of accumulated data
52+
* @returns {Promise.<object>} Promise that resolves to a value that has both data sets merged together
53+
*/
54+
AccumulableParam.prototype.addInPlace = function(r1,r2) {
55+
var templateStr = '{{inRefId}}.addInPlace({{r1}},{{r2}});';
56+
57+
return Utils.generateResultPromise(this, templateStr, {r1: r1, r2: r2});
58+
};
59+
60+
/**
61+
* Return the "zero" (identity) value for an accumulator type, given its initial value. For
62+
* example, if R was a vector of N dimensions, this would return a vector of N zeroes.
63+
* @param {object}
64+
* @returns {Promise.<object>}
65+
*/
66+
AccumulableParam.prototype.zero = function(initialValue) {
67+
var templateStr = '{{inRefId}}.zero({{initialValue}});';
68+
69+
return Utils.generateResultPromise(this, templateStr, {initialValue: initialValue});
70+
};
71+
72+
function FloatAccumulatorParam(kernelP, refIdP) {
73+
this.kernelP = kernelP;
74+
this.refIdP = refIdP;
75+
}
76+
77+
FloatAccumulatorParam.prototype = AccumulableParam.prototype;
78+
79+
80+
function IntAccumulatorParam(kernelP, refIdP) {
81+
this.kernelP = kernelP;
82+
this.refIdP = refIdP;
83+
}
84+
85+
IntAccumulatorParam.prototype = AccumulableParam.prototype;
86+
87+
module.exports = function(kP) {
88+
kernelP = kP;
89+
return {
90+
FloatAccumulatorParam: function() {
91+
var templateStr = 'var {{refId}} = new FloatAccumulatorParam();';
92+
return Utils.evaluate(kernelP, FloatAccumulatorParam, templateStr);
93+
},
94+
95+
IntAccumulatorParam: function() {
96+
var templateStr = 'var {{refId}} = new IntAccumulatorParam();';
97+
return Utils.evaluate(kernelP, IntAccumulatorParam, templateStr);
98+
}
99+
};
100+
};

lib/Accumulator.js

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2015 IBM Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
var Accumulable = require('./Accumulable.js');
18+
19+
/**
20+
* A simpler value of {@link Accumulable} where the result type being accumulated is the same
21+
* as the types of elements being merged, i.e. variables that are only "added" to through an
22+
* associative operation and can therefore be efficiently supported in parallel. They can be used
23+
* to implement counters (as in MapReduce) or sums. EclairJS supports accumulators of numeric
24+
* value types.
25+
*
26+
* An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
27+
* Tasks running on the cluster can then add to it using the [[Accumulable#add]].
28+
* However, they cannot read its value. Only the driver program can read the accumulator's value,
29+
* using its value method.
30+
*
31+
*
32+
* @example
33+
* var accum = sparkContext.accumulator(0);
34+
* sparkContext.parallelize([1, 2, 3, 4])
35+
* .foreach(function(x, accum) {
36+
* accum.add(x);
37+
* });
38+
* print(accum.value()); // displays 10
39+
*
40+
*/
41+
42+
function Accumulator(kernelP, refIdP) {
43+
this.kernelP = kernelP;
44+
this.refIdP = refIdP;
45+
}
46+
47+
Accumulator.prototype = Accumulable.prototype;
48+
49+
module.exports = Accumulator;

lib/SparkContext.js

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,13 @@ var kernelP = new Promise(function(resolve, reject) {
4646
* @param {string} name - A name for your application, to display on the cluster web UI
4747
*/
4848
function SparkContext(master, name) {
49-
kernel.createKernelSession(name).then(kernelPResolve).catch(kernelPReject);
50-
51-
var myThis = this;
49+
kernel.createKernelSession(name).then(kernelPResolve).catch(kernelPReject);
5250

5351
this.kernelP = new Promise(function(resolve, reject) {
5452
kernelP.then(function(kernel) {
5553
var templateStr = 'var jsc = new SparkContext("{{master}}", "{{name}}");';
5654

5755
Utils.execute(kernelP, templateStr, {master: master, name: name}).then(function() {
58-
// resolve(k);
59-
6056
// Check version
6157
templateStr = 'jsc.version();';
6258
// This is somewhat ugly, since SparkContext's kernelP hasn't been resolved yet.
@@ -73,13 +69,59 @@ function SparkContext(master, name) {
7369
});
7470
}
7571

72+
/**
73+
* Create an {@link Accumulable} shared variable of the given type, to which tasks can "add" values with add.
74+
* Only the master can access the accumuable's value.
75+
*
76+
* @param {object} initialValue
77+
* @param {AccumulableParam} param
78+
* @param {string} name of the accumulator for display in Spark's web UI.
79+
* @returns {Accumulable}
80+
*/
81+
SparkContext.prototype.accumulable = function() {
82+
var Accumulable = require('./Accumulable.js');
83+
84+
if (arguments.length == 3) {
85+
var templateStr = 'var {{refId}} = jsc.accumulable({{initialValue}}, {{name}}, {{param}});';
86+
87+
return Utils.evaluate(this.kernelP, Accumulable, templateStr, {initialValue: arguments[0], name: Utils.prepForReplacement(arguments[1]), param: Utils.prepForReplacement(arguments[2])});
88+
} else {
89+
var templateStr = 'var {{refId}} = jsc.accumulable({{initialValue}}, {{param}});';
90+
91+
return Utils.evaluate(this.kernelP, Accumulable, templateStr, {initialValue: arguments[0], param: Utils.prepForReplacement(arguments[1])});
92+
}
93+
};
94+
95+
/**
96+
* Create an {@link Accumulator} variable, which tasks can "add" values to using the add method.
97+
* Only the master can access the accumulator's value.
98+
*
99+
* @param {int | float} initialValue
100+
* @param {string} name of the accumulator for display in Spark's web UI. Optional
101+
* @param {AccumulableParam} param Optional defaults to FloatAccumulatorParam
102+
* @returns {Accumulator}
103+
*/
104+
SparkContext.prototype.accumulator = function() {
105+
var Accumulator = require('./Accumulator.js');
106+
107+
if (arguments.length == 3) {
108+
var templateStr = 'var {{refId}} = jsc.accumulator({{initialValue}}, {{name}}, {{param}});';
109+
110+
return Utils.evaluate(this.kernelP, Accumulator, templateStr, {initialValue: arguments[0], name: Utils.prepForReplacement(arguments[1]), param: Utils.prepForReplacement(arguments[2])});
111+
} else {
112+
var templateStr = 'var {{refId}} = jsc.accumulator({{initialValue}}, {{param}});';
113+
114+
return Utils.evaluate(this.kernelP, Accumulator, templateStr, {initialValue: arguments[0], param: Utils.prepForReplacement(arguments[1])});
115+
}
116+
};
117+
76118
/**
77119
* Distribute a local Scala collection to form an RDD.
78120
* @param {array} arr
79121
* @returns {RDD}
80122
*/
81123
SparkContext.prototype.parallelize = function(arr) {
82-
var templateStr = 'var {{refId}} = jsc.parallelize("{{arr}}");';
124+
var templateStr = 'var {{refId}} = jsc.parallelize([{{arr}}]);';
83125

84126
return Utils.evaluate(this.kernelP, RDD, templateStr, {arr: arr.join(',')});
85127
};

lib/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ function EclairJS() {
2626
return {
2727
SparkContext: result[1],
2828

29+
AccumulableParam: require('./AccumulableParam.js')(kernelP),
30+
2931
SQLContext: require('./sql/SQLContext.js'),
3032
sql: {
3133
functions: require('./sql/functions.js')(kernelP)

lib/sql/DataFrame.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
* limitations under the License.
1515
*/
1616

17-
var protocol = require('../kernel.js');
1817
var Utils = require('../utils.js');
1918
var serialize = require('../serialize.js');
2019

0 commit comments

Comments
 (0)